• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3519

05 Nov 2024 11:19AM UTC coverage: 57.706% (+8.4%) from 49.32%
#3519

push

travis-ci

web-flow
Merge pull request #28652 from taosdata/fix/3_liaohj

refactor: always successfully put the retrieve msg

109445 of 245179 branches covered (44.64%)

Branch coverage included in aggregate %.

187435 of 269288 relevant lines covered (69.6%)

12869818.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.73
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "dynqueryctrl.h"
29

30
int64_t gSessionId = 0;
31

32
void freeVgTableList(void* ptr) { 
×
33
  taosArrayDestroy(*(SArray**)ptr); 
×
34
}
×
35

36
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,925✔
37
  SStbJoinTableList* pNext = NULL;
1,925✔
38
  
39
  while (pListHead) {
1,926✔
40
    taosMemoryFree(pListHead->pLeftVg);
1✔
41
    taosMemoryFree(pListHead->pLeftUid);
1✔
42
    taosMemoryFree(pListHead->pRightVg);
1✔
43
    taosMemoryFree(pListHead->pRightUid);
1✔
44
    pNext = pListHead->pNext;
1✔
45
    taosMemoryFree(pListHead);
1✔
46
    pListHead = pNext;
1✔
47
  }
48
}
1,925✔
49

50
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,925✔
51
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,925✔
52
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
53
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
54

55
  if (pStbJoin->basic.batchFetch) {
1,925✔
56
    if (pStbJoin->ctx.prev.leftHash) {
1,922✔
57
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,824✔
58
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,824✔
59
    }
60
    if (pStbJoin->ctx.prev.rightHash) {
1,922✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,824✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,824✔
63
    }
64
  } else {
65
    if (pStbJoin->ctx.prev.leftCache) {
3!
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
3✔
67
    }
68
    if (pStbJoin->ctx.prev.rightCache) {
3!
69
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
3✔
70
    }
71
    if (pStbJoin->ctx.prev.onceTable) {
3!
72
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
3✔
73
    }
74
  }
75

76
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,925✔
77
}
1,925✔
78

79
static void destroyDynQueryCtrlOperator(void* param) {
1,925✔
80
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
1,925✔
81

82
  switch (pDyn->qType) {
1,925!
83
    case DYN_QTYPE_STB_HASH:
1,925✔
84
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,925✔
85
      break;
1,925✔
86
    default:
×
87
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
88
      break;
×
89
  }
90

91
  taosMemoryFreeClear(param);
1,925!
92
}
1,925✔
93

94
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
95
  if (batchFetch) {
14,500✔
96
    return true;
14,488✔
97
  }
98
  
99
  if (rightTable) {
12!
100
    return pPost->rightCurrUid == pPost->rightNextUid;
6✔
101
  }
102

103
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
6✔
104

105
  return (NULL == num) ? false : true;
6✔
106
}
107

108
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
7,250✔
109
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
7,250✔
110
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
7,250✔
111
  SStbJoinTableList*         pNode = pPrev->pListHead;
7,250✔
112
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
7,250✔
113
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
7,250✔
114
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
7,250✔
115
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
7,250✔
116
  int64_t                    readIdx = pNode->readIdx + 1;
7,250✔
117
  int64_t                    rightPrevUid = pPost->rightCurrUid;
7,250✔
118

119
  pPost->leftCurrUid = *leftUid;
7,250✔
120
  pPost->rightCurrUid = *rightUid;
7,250✔
121

122
  pPost->leftVgId = *leftVgId;
7,250✔
123
  pPost->rightVgId = *rightVgId;
7,250✔
124

125
  while (true) {
126
    if (readIdx < pNode->uidNum) {
7,250✔
127
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
7,150✔
128
      break;
7,150✔
129
    }
130
    
131
    pNode = pNode->pNext;
100✔
132
    if (NULL == pNode) {
100!
133
      pPost->rightNextUid = 0;
100✔
134
      break;
100✔
135
    }
136
    
137
    rightUid = pNode->pRightUid;
×
138
    readIdx = 0;
×
139
  }
140

141
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,250✔
142
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,250✔
143

144
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
7,250!
145
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
146
    pStbJoin->execInfo.rightCacheNum++;
×
147
  }  
148

149
  return TSDB_CODE_SUCCESS;
7,250✔
150
}
151

152

153
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
14,500✔
154
  int32_t code = TSDB_CODE_SUCCESS;
14,500✔
155
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
14,500✔
156
  if (NULL == *ppRes) {
14,500!
157
    code = terrno;
×
158
    freeOperatorParam(pChild, OP_GET_PARAM);
×
159
    return code;
×
160
  }
161
  if (pChild) {
14,500✔
162
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
208✔
163
    if (NULL == (*ppRes)->pChildren) {
208!
164
      code = terrno;
×
165
      freeOperatorParam(pChild, OP_GET_PARAM);
×
166
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
167
      *ppRes = NULL;
×
168
      return code;
×
169
    }
170
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
416!
171
      code = terrno;
×
172
      freeOperatorParam(pChild, OP_GET_PARAM);
×
173
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
174
      *ppRes = NULL;
×
175
      return code;
×
176
    }
177
  } else {
178
    (*ppRes)->pChildren = NULL;
14,292✔
179
  }
180

181
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
14,500✔
182
  if (NULL == pGc) {
14,500!
183
    code = terrno;
×
184
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
185
    *ppRes = NULL;
×
186
    return code;
×
187
  }
188

189
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
14,500✔
190
  pGc->downstreamIdx = downstreamIdx;
14,500✔
191
  pGc->vgId = vgId;
14,500✔
192
  pGc->tbUid = tbUid;
14,500✔
193
  pGc->needCache = needCache;
14,500✔
194

195
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
14,500✔
196
  (*ppRes)->downstreamIdx = downstreamIdx;
14,500✔
197
  (*ppRes)->value = pGc;
14,500✔
198

199
  return TSDB_CODE_SUCCESS;
14,500✔
200
}
201

202

203
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
204
  int32_t code = TSDB_CODE_SUCCESS;
×
205
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
206
  if (NULL == *ppRes) {
×
207
    return terrno;
×
208
  }
209
  (*ppRes)->pChildren = NULL;
×
210

211
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
212
  if (NULL == pGc) {
×
213
    code = terrno;
×
214
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
215
    return code;
×
216
  }
217

218
  pGc->downstreamIdx = downstreamIdx;
×
219
  pGc->vgId = vgId;
×
220
  pGc->tbUid = tbUid;
×
221

222
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
223
  (*ppRes)->downstreamIdx = downstreamIdx;
×
224
  (*ppRes)->value = pGc;
×
225

226
  return TSDB_CODE_SUCCESS;
×
227
}
228

229

230
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
12✔
231
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
12✔
232
  if (NULL == *ppRes) {
12!
233
    return terrno;
×
234
  }
235
  (*ppRes)->pChildren = NULL;
12✔
236
  
237
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
12✔
238
  if (NULL == pExc) {
12!
239
    return terrno;
×
240
  }
241

242
  pExc->multiParams = false;
12✔
243
  pExc->basic.vgId = *pVgId;
12✔
244
  pExc->basic.tableSeq = true;
12✔
245
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
12✔
246
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
12✔
247
  if (NULL == pExc->basic.uidList) {
12!
248
    taosMemoryFree(pExc);
×
249
    return terrno;
×
250
  }
251
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
24!
252
    taosArrayDestroy(pExc->basic.uidList);
×
253
    taosMemoryFree(pExc);
×
254
    return terrno;
×
255
  }
256

257
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
12✔
258
  (*ppRes)->downstreamIdx = downstreamIdx;
12✔
259
  (*ppRes)->value = pExc;
12✔
260
  
261
  return TSDB_CODE_SUCCESS;
12✔
262
}
263

264

265
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
196✔
266
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
196✔
267
  if (NULL == *ppRes) {
196!
268
    return terrno;
×
269
  }
270
  (*ppRes)->pChildren = NULL;
196✔
271
  
272
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
196✔
273
  if (NULL == pExc) {
196!
274
    taosMemoryFreeClear(*ppRes);
×
275
    return terrno;
×
276
  }
277

278
  pExc->multiParams = true;
196✔
279
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
196✔
280
  if (NULL == pExc->pBatchs) {
196!
281
    taosMemoryFree(pExc);
×
282
    taosMemoryFreeClear(*ppRes);
×
283
    return terrno;
×
284
  }
285
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
196✔
286
  
287
  SExchangeOperatorBasicParam basic;
288
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
196✔
289

290
  int32_t iter = 0;
196✔
291
  void* p = NULL;
196✔
292
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
550✔
293
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
354✔
294
    SArray* pUidList = *(SArray**)p;
354✔
295
    basic.vgId = *pVgId;
354✔
296
    basic.uidList = pUidList;
354✔
297
    basic.tableSeq = false;
354✔
298
    
299
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
354!
300

301
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
354!
302
    *(SArray**)p = NULL;
354✔
303
  }
304

305
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
196✔
306
  (*ppRes)->downstreamIdx = downstreamIdx;
196✔
307
  (*ppRes)->value = pExc;
196✔
308
  
309
  return TSDB_CODE_SUCCESS;
196✔
310
}
311

312

313
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
7,250✔
314
  int32_t code = TSDB_CODE_SUCCESS;
7,250✔
315
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,250✔
316
  if (NULL == *ppRes) {
7,250!
317
    code = terrno;
×
318
    return code;
×
319
  }
320
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
7,250✔
321
  if (NULL == (*ppRes)->pChildren) {
7,250!
322
    code = terrno;
×
323
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
324
    *ppRes = NULL;
×
325
    return code;
×
326
  }
327
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
14,500!
328
    code = terrno;
×
329
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
330
    *ppRes = NULL;
×
331
    return code;
×
332
  }
333
  *ppChild0 = NULL;
7,250✔
334
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
14,500!
335
    code = terrno;
×
336
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
337
    *ppRes = NULL;
×
338
    return code;
×
339
  }
340
  *ppChild1 = NULL;
7,250✔
341
  
342
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
7,250✔
343
  if (NULL == pJoin) {
7,250!
344
    code = terrno;
×
345
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
346
    *ppRes = NULL;
×
347
    return code;
×
348
  }
349

350
  pJoin->initDownstream = initParam;
7,250✔
351
  
352
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
7,250✔
353
  (*ppRes)->value = pJoin;
7,250✔
354

355
  return TSDB_CODE_SUCCESS;
7,250✔
356
}
357

358

359
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
360
  int32_t code = TSDB_CODE_SUCCESS;
×
361
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
362
  if (NULL == *ppRes) {
×
363
    code = terrno;
×
364
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
365
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
366
    return code;
×
367
  }
368
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
369
  if (NULL == *ppRes) {
×
370
    code = terrno;
×
371
    taosMemoryFreeClear(*ppRes);
×
372
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
373
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
374
    return code;
×
375
  }
376
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
377
    code = terrno;
×
378
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
379
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
380
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
381
    *ppRes = NULL;
×
382
    return code;
×
383
  }
384
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
385
    code = terrno;
×
386
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
387
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
388
    *ppRes = NULL;
×
389
    return code;
×
390
  }
391
  
392
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
393
  (*ppRes)->value = NULL;
×
394

395
  return TSDB_CODE_SUCCESS;
×
396
}
397

398

399

400
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
×
401
  int32_t code = TSDB_CODE_SUCCESS;
×
402
  int32_t vgNum = tSimpleHashGetSize(pVg);
×
403
  if (vgNum <= 0 || vgNum > 1) {
×
404
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
405
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
406
  }
407

408
  int32_t iter = 0;
×
409
  void* p = NULL;
×
410
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
411
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
×
412
    SArray* pUidList = *(SArray**)p;
×
413

414
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
×
415
    if (code) {
×
416
      return code;
×
417
    }
418
    taosArrayDestroy(pUidList);
×
419
    *(SArray**)p = NULL;
×
420
  }
421
  
422
  return TSDB_CODE_SUCCESS;
×
423
}
424

425

426
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
427
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
428
  if (NULL == pUidList) {
×
429
    return terrno;
×
430
  }
431
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
432
    return terrno;
×
433
  }
434

435
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
436
  taosArrayDestroy(pUidList);
×
437
  if (code) {
×
438
    return code;
×
439
  }
440
  
441
  return TSDB_CODE_SUCCESS;
×
442
}
443

444
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
7,250✔
445
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
7,250✔
446
  SOperatorParam*             pSrcParam0 = NULL;
7,250✔
447
  SOperatorParam*             pSrcParam1 = NULL;
7,250✔
448
  SOperatorParam*             pGcParam0 = NULL;
7,250✔
449
  SOperatorParam*             pGcParam1 = NULL;  
7,250✔
450
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
7,250✔
451
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
7,250✔
452
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
7,250✔
453
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
7,250✔
454
  int32_t                     code = TSDB_CODE_SUCCESS;
7,250✔
455

456
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
7,250✔
457
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
458

459
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
7,250!
460
  
461
  if (pInfo->stbJoin.basic.batchFetch) {
7,250✔
462
    if (pPrev->leftHash) {
7,244✔
463
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
98!
464
      if (TSDB_CODE_SUCCESS == code) {
98!
465
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
98!
466
      }
467
      if (TSDB_CODE_SUCCESS == code) {
98!
468
        tSimpleHashCleanup(pPrev->leftHash);
98✔
469
        tSimpleHashCleanup(pPrev->rightHash);
98✔
470
        pPrev->leftHash = NULL;
98✔
471
        pPrev->rightHash = NULL;
98✔
472
      }
473
    }
474
  } else {
475
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
6!
476
    if (TSDB_CODE_SUCCESS == code) {
6!
477
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
6!
478
    }
479
  }
480

481
  bool initParam = pSrcParam0 ? true : false;
7,250✔
482
  if (TSDB_CODE_SUCCESS == code) {
7,250!
483
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
7,250✔
484
    pSrcParam0 = NULL;
7,250✔
485
  }
486
  if (TSDB_CODE_SUCCESS == code) {
7,250!
487
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
7,250✔
488
    pSrcParam1 = NULL;
7,250✔
489
  }
490
  if (TSDB_CODE_SUCCESS == code) {
7,250!
491
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
7,250✔
492
  }
493
  if (TSDB_CODE_SUCCESS != code) {
7,250!
494
    if (pSrcParam0) {
×
495
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
496
    }
497
    if (pSrcParam1) {
×
498
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
499
    }
500
    if (pGcParam0) {
×
501
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
502
    }
503
    if (pGcParam1) {
×
504
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
505
    }
506
    if (*ppParam) {
×
507
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
508
      *ppParam = NULL;
×
509
    }
510
  }
511
  
512
  return code;
7,250✔
513
}
514

515

516
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
7,250✔
517
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
7,250✔
518
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
7,250✔
519
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
7,250✔
520
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
7,250✔
521
  SOperatorParam*            pParam = NULL;
7,250✔
522
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
7,250✔
523
  if (TSDB_CODE_SUCCESS != code) {
7,250!
524
    pOperator->pTaskInfo->code = code;
×
525
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
526
  }
527

528
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
7,250✔
529
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
7,250✔
530
  if (*ppRes && (code == 0)) {
7,250!
531
    blockDataCheck(*ppRes, false);
334✔
532
    pPost->isStarted = true;
334✔
533
    pStbJoin->execInfo.postBlkNum++;
334✔
534
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
334✔
535
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
334✔
536
  } else {
537
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
6,916✔
538
  }
539
}
7,250✔
540

541

542
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
543
  SOperatorParam* pGcParam = NULL;
×
544
  SOperatorParam* pMergeJoinParam = NULL;
×
545
  int32_t         downstreamId = leftTable ? 0 : 1;
×
546
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
547
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
548

549
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
550

551
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
552
  if (TSDB_CODE_SUCCESS != code) {
×
553
    return code;
×
554
  }
555
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
556
  if (TSDB_CODE_SUCCESS != code) {
×
557
    return code;
×
558
  }
559

560
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
561
}
562

563
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
7,249✔
564
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
7,249✔
565
  int32_t code = 0;
7,249✔
566
  
567
  pPost->isStarted = false;
7,249✔
568
  
569
  if (pStbJoin->basic.batchFetch) {
7,249✔
570
    return TSDB_CODE_SUCCESS;
7,243✔
571
  }
572
  
573
  if (pPost->leftNeedCache) {
6!
574
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
575
    if (num && --(*num) <= 0) {
×
576
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
577
      if (code) {
×
578
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
579
        QRY_ERR_RET(code);
×
580
      }
581
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
582
    }
583
  }
584
  
585
  if (!pPost->rightNeedCache) {
6!
586
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
6✔
587
    if (NULL != v) {
6!
588
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
589
      if (code) {
×
590
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
591
        QRY_ERR_RET(code);
×
592
      }
593
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
594
    }
595
  }
596

597
  return TSDB_CODE_SUCCESS;
6✔
598
}
599

600

601
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
602
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
434✔
603
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
434✔
604
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
434✔
605

606
  if (!pPost->isStarted) {
434✔
607
    return TSDB_CODE_SUCCESS;
101✔
608
  }
609
  
610
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
333✔
611
  
612
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
333✔
613
  if (NULL == *ppRes) {
333!
614
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
333!
615
    pPrev->pListHead->readIdx++;
333✔
616
  } else {
617
    pInfo->stbJoin.execInfo.postBlkNum++;
×
618
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
619
  }
620

621
  return TSDB_CODE_SUCCESS;
333✔
622
}
623

624
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
625
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
14,496✔
626
  if (NULL == ppArray) {
14,496✔
627
    SArray* pArray = taosArrayInit(10, valSize);
354✔
628
    if (NULL == pArray) {
354!
629
      return terrno;
×
630
    }
631
    if (NULL == taosArrayPush(pArray, pVal)) {
708!
632
      taosArrayDestroy(pArray);
×
633
      return terrno;
×
634
    }
635
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
354!
636
      taosArrayDestroy(pArray);      
×
637
      return terrno;
×
638
    }
639
    return TSDB_CODE_SUCCESS;
354✔
640
  }
641

642
  if (NULL == taosArrayPush(*ppArray, pVal)) {
28,284!
643
    return terrno;
×
644
  }
645
  
646
  return TSDB_CODE_SUCCESS;
14,142✔
647
}
648

649
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
650
  int32_t code = TSDB_CODE_SUCCESS;
6✔
651
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
6✔
652
  if (NULL == pNum) {
6!
653
    uint32_t n = 1;
6✔
654
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
6✔
655
    if (code) {
6!
656
      return code;
×
657
    }
658
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
6✔
659
    if (code) {
6!
660
      return code;
×
661
    }
662
    return TSDB_CODE_SUCCESS;
6✔
663
  }
664

665
  switch (*pNum) {
×
666
    case 0:
×
667
      break;
×
668
    case UINT32_MAX:
×
669
      *pNum = 0;
×
670
      break;
×
671
    default:
×
672
      if (1 == (*pNum)) {
×
673
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
674
        if (code) {
×
675
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
676
          QRY_ERR_RET(code);
×
677
        }
678
      }
679
      (*pNum)++;
×
680
      break;
×
681
  }
682
  
683
  return TSDB_CODE_SUCCESS;
×
684
}
685

686

687
static void freeStbJoinTableList(SStbJoinTableList* pList) {
100✔
688
  if (NULL == pList) {
100!
689
    return;
×
690
  }
691
  taosMemoryFree(pList->pLeftVg);
100✔
692
  taosMemoryFree(pList->pLeftUid);
100✔
693
  taosMemoryFree(pList->pRightVg);
100✔
694
  taosMemoryFree(pList->pRightUid);
100✔
695
  taosMemoryFree(pList);
100✔
696
}
697

698
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
101✔
699
  int32_t code = TSDB_CODE_SUCCESS;
101✔
700
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
101✔
701
  if (NULL == pNew) {
101!
702
    return terrno;
×
703
  }
704
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
101✔
705
  if (NULL == pNew->pLeftVg) {
101!
706
    code = terrno;
×
707
    freeStbJoinTableList(pNew);
×
708
    return code;
×
709
  }
710
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
101✔
711
  if (NULL == pNew->pLeftUid) {
101!
712
    code = terrno;
×
713
    freeStbJoinTableList(pNew);
×
714
    return code;
×
715
  }
716
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
101✔
717
  if (NULL == pNew->pRightVg) {
101!
718
    code = terrno;
×
719
    freeStbJoinTableList(pNew);
×
720
    return code;
×
721
  }
722
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
101✔
723
  if (NULL == pNew->pRightUid) {
101!
724
    code = terrno;
×
725
    freeStbJoinTableList(pNew);
×
726
    return code;
×
727
  }
728

729
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
101✔
730
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
101✔
731
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
101✔
732
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
101✔
733

734
  pNew->readIdx = 0;
101✔
735
  pNew->uidNum = rows;
101✔
736
  pNew->pNext = NULL;
101✔
737
  
738
  if (pCtx->pListTail) {
101!
739
    pCtx->pListTail->pNext = pNew;
×
740
    pCtx->pListTail = pNew;
×
741
  } else {
742
    pCtx->pListHead = pNew;
101✔
743
    pCtx->pListTail= pNew;
101✔
744
  }
745

746
  return TSDB_CODE_SUCCESS;
101✔
747
}
748

749
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
101✔
750
  int32_t                    code = TSDB_CODE_SUCCESS;
101✔
751
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
101✔
752
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
101✔
753
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
101✔
754
  if (NULL == pVg0) {
101!
755
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
756
  }
757
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
101✔
758
  if (NULL == pVg1) {
101!
759
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
760
  }
761
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
101✔
762
  if (NULL == pUid0) {
101!
763
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
764
  }
765
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
101✔
766
  if (NULL == pUid1) {
101!
767
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
768
  }
769

770
  if (pStbJoin->basic.batchFetch) {
101✔
771
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
7,346✔
772
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
7,248✔
773
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
7,248✔
774
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
7,248✔
775
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
7,248✔
776

777
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
7,248✔
778
      if (TSDB_CODE_SUCCESS != code) {
7,248!
779
        break;
×
780
      }
781
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
7,248✔
782
      if (TSDB_CODE_SUCCESS != code) {
7,248!
783
        break;
×
784
      }
785
    }
786
  } else {
787
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
9✔
788
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
6✔
789
    
790
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
6✔
791
      if (TSDB_CODE_SUCCESS != code) {
6!
792
        break;
×
793
      }
794
    }
795
  }
796

797
  if (TSDB_CODE_SUCCESS == code) {
101!
798
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
101✔
799
    if (TSDB_CODE_SUCCESS == code) {
101!
800
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
101✔
801
    }
802
  }
803

804
_return:
×
805

806
  if (TSDB_CODE_SUCCESS != code) {
101!
807
    pOperator->pTaskInfo->code = code;
×
808
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
809
  }
810
}
101✔
811

812

813
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,925✔
814
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,925✔
815
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,925✔
816

817
  if (pStbJoin->basic.batchFetch) {
1,925✔
818
    return;
1,925✔
819
  }
820

821
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
3!
822
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
3✔
823
    return;
3✔
824
  }
825

826
  uint64_t* pUid = NULL;
×
827
  int32_t iter = 0;
×
828
  int32_t code = 0;
×
829
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
830
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
831
    if (code) {
×
832
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
833
    }
834
  }
835

836
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
837
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
838

839
/*
840
  // debug only
841
  iter = 0;
842
  uint32_t* num = NULL;
843
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
844
    A S S E R T(*num > 1);
845
  }
846
*/  
847
}
848

849
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,925✔
850
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,925✔
851
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,925✔
852

853
  while (true) {
101✔
854
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
2,026✔
855
    if (NULL == pBlock) {
2,026✔
856
      break;
1,925✔
857
    }
858

859
    pStbJoin->execInfo.prevBlkNum++;
101✔
860
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
101✔
861
    
862
    doBuildStbJoinTableHash(pOperator, pBlock);
101✔
863
  }
864

865
  postProcessStbJoinTableHash(pOperator);
1,925✔
866

867
  pStbJoin->ctx.prev.joinBuild = true;
1,925✔
868
}
1,925✔
869

870
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
434✔
871
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
434✔
872
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
434✔
873
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
434✔
874
  SStbJoinTableList*         pNode = pPrev->pListHead;
434✔
875

876
  while (pNode) {
7,450✔
877
    if (pNode->readIdx >= pNode->uidNum) {
7,350✔
878
      pPrev->pListHead = pNode->pNext;
100✔
879
      freeStbJoinTableList(pNode);
100✔
880
      pNode = pPrev->pListHead;
100✔
881
      continue;
100✔
882
    }
883
    
884
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
7,250✔
885
    if (*ppRes) {
7,250✔
886
      return TSDB_CODE_SUCCESS;
334✔
887
    }
888

889
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
6,916!
890
    pPrev->pListHead->readIdx++;
6,916✔
891
  }
892

893
  *ppRes = NULL;
100✔
894
  setOperatorCompleted(pOperator);
100✔
895

896
  return TSDB_CODE_SUCCESS;
100✔
897
}
898

899
static FORCE_INLINE void seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
900
  if (pBlock != NULL) {
2,258✔
901
    pBlock->info.id.blockId = pStbJoin->outputBlkId;
334✔
902
  }
903
}
2,258✔
904

905
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
2,298✔
906
  int32_t                    code = TSDB_CODE_SUCCESS;
2,298✔
907
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,298✔
908
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
2,298✔
909

910
  QRY_PARAM_CHECK(pRes);
2,298!
911
  if (pOperator->status == OP_EXEC_DONE) {
2,298✔
912
    return code;
40✔
913
  }
914

915
  int64_t st = 0;
2,258✔
916
  if (pOperator->cost.openCost == 0) {
2,258✔
917
    st = taosGetTimestampUs();
1,925✔
918
  }
919

920
  if (!pStbJoin->ctx.prev.joinBuild) {
2,258✔
921
    buildStbJoinTableList(pOperator);
1,925✔
922
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,925✔
923
      setOperatorCompleted(pOperator);
1,824✔
924
      goto _return;
1,824✔
925
    }
926
  }
927

928
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
434!
929
  if (*pRes) {
434!
930
    goto _return;
×
931
  }
932

933
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
434!
934

935
_return:
434✔
936
  if (pOperator->cost.openCost == 0) {
2,258✔
937
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,925✔
938
  }
939

940
  if (code) {
2,258!
941
    qError("%s failed since %s", __func__, tstrerror(code));
×
942
    pOperator->pTaskInfo->code = code;
×
943
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
944
  } else {
945
    seqStableJoinComposeRes(pStbJoin, *pRes);
2,258✔
946
  }
947
  return code;
2,258✔
948
}
949

950
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,925✔
951
  if (batchFetch) {
1,925✔
952
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,922✔
953
    if (NULL == pPrev->leftHash) {
1,922!
954
      return terrno;
×
955
    }
956
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,922✔
957
    if (NULL == pPrev->rightHash) {
1,922!
958
      return terrno;
×
959
    }
960
  } else {
961
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3✔
962
    if (NULL == pPrev->leftCache) {
3!
963
      return terrno;
×
964
    }
965
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3✔
966
    if (NULL == pPrev->rightCache) {
3!
967
      return terrno;
×
968
    }
969
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3✔
970
    if (NULL == pPrev->onceTable) {
3!
971
      return terrno;
×
972
    }
973
  }
974

975
  return TSDB_CODE_SUCCESS;
1,925✔
976
}
977

978
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
1,925✔
979
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
980
                                       SOperatorInfo** pOptrInfo) {
981
  QRY_PARAM_CHECK(pOptrInfo);
1,925!
982

983
  int32_t                    code = TSDB_CODE_SUCCESS;
1,925✔
984
  __optr_fn_t                nextFp = NULL;
1,925✔
985
  SOperatorInfo*             pOperator = NULL;
1,925✔
986
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
1,925✔
987
  if (pInfo == NULL) {
1,925!
988
    code = terrno;
×
989
    goto _error;
×
990
  }
991

992
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,925✔
993
  if (pOperator == NULL) {
1,925!
994
    code = terrno;
×
995
    goto _error;
×
996
  }
997

998
  pTaskInfo->dynamicTask = pPhyciNode->node.dynamicOp;
1,925✔
999

1000
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
1,925✔
1001
  if (TSDB_CODE_SUCCESS != code) {
1,925!
1002
    goto _error;
×
1003
  }
1004

1005
  pInfo->qType = pPhyciNode->qType;
1,925✔
1006
  switch (pInfo->qType) {
1,925!
1007
    case DYN_QTYPE_STB_HASH:
1,925✔
1008
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,925✔
1009
      pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId;
1,925✔
1010
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,925✔
1011
      if (TSDB_CODE_SUCCESS != code) {
1,925!
1012
        goto _error;
×
1013
      }
1014
      nextFp = seqStableJoin;
1,925✔
1015
      break;
1,925✔
1016
    default:
×
1017
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1018
      code = TSDB_CODE_INVALID_PARA;
×
1019
      goto _error;
×
1020
  }
1021

1022
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
1,925✔
1023
                  pInfo, pTaskInfo);
1024

1025
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
1,925✔
1026
                                         NULL, optrDefaultGetNextExtFn, NULL);
1027

1028
  *pOptrInfo = pOperator;
1,925✔
1029
  return TSDB_CODE_SUCCESS;
1,925✔
1030

1031
_error:
×
1032
  if (pInfo != NULL) {
×
1033
    destroyDynQueryCtrlOperator(pInfo);
×
1034
  }
1035

1036
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1037
  pTaskInfo->code = code;
×
1038
  return code;
×
1039
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc