• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.06
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "dynqueryctrl.h"
29

30
int64_t gSessionId = 0;
31

UNCOV
32
void freeVgTableList(void* ptr) { 
×
UNCOV
33
  taosArrayDestroy(*(SArray**)ptr); 
×
UNCOV
34
}
×
35

36
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
11,002✔
37
  SStbJoinTableList* pNext = NULL;
11,002✔
38
  
39
  while (pListHead) {
11,003✔
40
    taosMemoryFree(pListHead->pLeftVg);
1✔
41
    taosMemoryFree(pListHead->pLeftUid);
1✔
42
    taosMemoryFree(pListHead->pRightVg);
1✔
43
    taosMemoryFree(pListHead->pRightUid);
1✔
44
    pNext = pListHead->pNext;
1✔
45
    taosMemoryFree(pListHead);
1✔
46
    pListHead = pNext;
1✔
47
  }
48
}
11,002✔
49

50
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
11,002✔
51
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
11,002✔
52
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
53
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
54

55
  if (pStbJoin->basic.batchFetch) {
11,002✔
56
    if (pStbJoin->ctx.prev.leftHash) {
10,990✔
57
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
10,690✔
58
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
10,690✔
59
    }
60
    if (pStbJoin->ctx.prev.rightHash) {
10,990✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
10,690✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
10,690✔
63
    }
64
  } else {
65
    if (pStbJoin->ctx.prev.leftCache) {
12!
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
12✔
67
    }
68
    if (pStbJoin->ctx.prev.rightCache) {
12!
69
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
12✔
70
    }
71
    if (pStbJoin->ctx.prev.onceTable) {
12!
72
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
12✔
73
    }
74
  }
75

76
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
11,002✔
77
}
11,002✔
78

79
static void destroyDynQueryCtrlOperator(void* param) {
11,002✔
80
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
11,002✔
81

82
  switch (pDyn->qType) {
11,002!
83
    case DYN_QTYPE_STB_HASH:
11,002✔
84
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
11,002✔
85
      break;
11,002✔
86
    default:
×
87
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
88
      break;
×
89
  }
90

91
  taosMemoryFreeClear(param);
11,002!
92
}
11,002✔
93

94
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
95
  if (batchFetch) {
69,006✔
96
    return true;
68,958✔
97
  }
98
  
99
  if (rightTable) {
48!
100
    return pPost->rightCurrUid == pPost->rightNextUid;
24✔
101
  }
102

103
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
24✔
104

105
  return (NULL == num) ? false : true;
24✔
106
}
107

108
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
34,503✔
109
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
34,503✔
110
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
34,503✔
111
  SStbJoinTableList*         pNode = pPrev->pListHead;
34,503✔
112
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
34,503✔
113
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
34,503✔
114
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
34,503✔
115
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
34,503✔
116
  int64_t                    readIdx = pNode->readIdx + 1;
34,503✔
117
  int64_t                    rightPrevUid = pPost->rightCurrUid;
34,503✔
118

119
  pPost->leftCurrUid = *leftUid;
34,503✔
120
  pPost->rightCurrUid = *rightUid;
34,503✔
121

122
  pPost->leftVgId = *leftVgId;
34,503✔
123
  pPost->rightVgId = *rightVgId;
34,503✔
124

125
  while (true) {
126
    if (readIdx < pNode->uidNum) {
34,503✔
127
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
34,192✔
128
      break;
34,192✔
129
    }
130
    
131
    pNode = pNode->pNext;
311✔
132
    if (NULL == pNode) {
311!
133
      pPost->rightNextUid = 0;
311✔
134
      break;
311✔
135
    }
136
    
137
    rightUid = pNode->pRightUid;
×
138
    readIdx = 0;
×
139
  }
140

141
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
34,503✔
142
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
34,503✔
143

144
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
34,503!
145
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
146
    pStbJoin->execInfo.rightCacheNum++;
×
147
  }  
148

149
  return TSDB_CODE_SUCCESS;
34,503✔
150
}
151

152

153
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
69,006✔
154
  int32_t code = TSDB_CODE_SUCCESS;
69,006✔
155
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
69,006✔
156
  if (NULL == *ppRes) {
69,006!
157
    code = terrno;
×
158
    freeOperatorParam(pChild, OP_GET_PARAM);
×
159
    return code;
×
160
  }
161
  if (pChild) {
69,006✔
162
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
648✔
163
    if (NULL == (*ppRes)->pChildren) {
648!
164
      code = terrno;
×
165
      freeOperatorParam(pChild, OP_GET_PARAM);
×
166
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
167
      *ppRes = NULL;
×
168
      return code;
×
169
    }
170
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
1,296!
171
      code = terrno;
×
172
      freeOperatorParam(pChild, OP_GET_PARAM);
×
173
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
174
      *ppRes = NULL;
×
175
      return code;
×
176
    }
177
  } else {
178
    (*ppRes)->pChildren = NULL;
68,358✔
179
  }
180

181
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
69,006✔
182
  if (NULL == pGc) {
69,006!
183
    code = terrno;
×
184
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
185
    *ppRes = NULL;
×
186
    return code;
×
187
  }
188

189
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
69,006✔
190
  pGc->downstreamIdx = downstreamIdx;
69,006✔
191
  pGc->vgId = vgId;
69,006✔
192
  pGc->tbUid = tbUid;
69,006✔
193
  pGc->needCache = needCache;
69,006✔
194

195
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
69,006✔
196
  (*ppRes)->downstreamIdx = downstreamIdx;
69,006✔
197
  (*ppRes)->value = pGc;
69,006✔
198

199
  return TSDB_CODE_SUCCESS;
69,006✔
200
}
201

202

203
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
204
  int32_t code = TSDB_CODE_SUCCESS;
×
205
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
206
  if (NULL == *ppRes) {
×
207
    return terrno;
×
208
  }
209
  (*ppRes)->pChildren = NULL;
×
210

211
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
212
  if (NULL == pGc) {
×
213
    code = terrno;
×
214
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
215
    return code;
×
216
  }
217

218
  pGc->downstreamIdx = downstreamIdx;
×
219
  pGc->vgId = vgId;
×
220
  pGc->tbUid = tbUid;
×
221

222
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
223
  (*ppRes)->downstreamIdx = downstreamIdx;
×
224
  (*ppRes)->value = pGc;
×
225

226
  return TSDB_CODE_SUCCESS;
×
227
}
228

229

230
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
48✔
231
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
48✔
232
  if (NULL == *ppRes) {
48!
233
    return terrno;
×
234
  }
235
  (*ppRes)->pChildren = NULL;
48✔
236
  
237
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
48✔
238
  if (NULL == pExc) {
48!
239
    return terrno;
×
240
  }
241

242
  pExc->multiParams = false;
48✔
243
  pExc->basic.vgId = *pVgId;
48✔
244
  pExc->basic.tableSeq = true;
48✔
245
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
48✔
246
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
48✔
247
  if (NULL == pExc->basic.uidList) {
48!
248
    taosMemoryFree(pExc);
×
249
    return terrno;
×
250
  }
251
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
96!
252
    taosArrayDestroy(pExc->basic.uidList);
×
253
    taosMemoryFree(pExc);
×
254
    return terrno;
×
255
  }
256

257
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
48✔
258
  (*ppRes)->downstreamIdx = downstreamIdx;
48✔
259
  (*ppRes)->value = pExc;
48✔
260
  
261
  return TSDB_CODE_SUCCESS;
48✔
262
}
263

264

265
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
598✔
266
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
598✔
267
  if (NULL == *ppRes) {
598!
268
    return terrno;
×
269
  }
270
  (*ppRes)->pChildren = NULL;
598✔
271
  
272
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
598✔
273
  if (NULL == pExc) {
598!
274
    taosMemoryFreeClear(*ppRes);
×
275
    return terrno;
×
276
  }
277

278
  pExc->multiParams = true;
598✔
279
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
598✔
280
  if (NULL == pExc->pBatchs) {
598!
281
    taosMemoryFree(pExc);
×
282
    taosMemoryFreeClear(*ppRes);
×
283
    return terrno;
×
284
  }
285
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
598✔
286
  
287
  SExchangeOperatorBasicParam basic;
288
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
598✔
289

290
  int32_t iter = 0;
598✔
291
  void* p = NULL;
598✔
292
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
1,596✔
293
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
998✔
294
    SArray* pUidList = *(SArray**)p;
998✔
295
    basic.vgId = *pVgId;
998✔
296
    basic.uidList = pUidList;
998✔
297
    basic.tableSeq = false;
998✔
298
    
299
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
998!
300

301
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
998!
302
    *(SArray**)p = NULL;
998✔
303
  }
304

305
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
598✔
306
  (*ppRes)->downstreamIdx = downstreamIdx;
598✔
307
  (*ppRes)->value = pExc;
598✔
308
  
309
  return TSDB_CODE_SUCCESS;
598✔
310
}
311

312

313
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
34,503✔
314
  int32_t code = TSDB_CODE_SUCCESS;
34,503✔
315
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
34,503✔
316
  if (NULL == *ppRes) {
34,503!
UNCOV
317
    code = terrno;
×
UNCOV
318
    return code;
×
319
  }
320
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
34,503✔
321
  if (NULL == (*ppRes)->pChildren) {
34,503!
UNCOV
322
    code = terrno;
×
UNCOV
323
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
324
    *ppRes = NULL;
×
UNCOV
325
    return code;
×
326
  }
327
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
69,006!
328
    code = terrno;
×
329
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
330
    *ppRes = NULL;
×
331
    return code;
×
332
  }
333
  *ppChild0 = NULL;
34,503✔
334
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
69,006!
335
    code = terrno;
×
336
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
337
    *ppRes = NULL;
×
338
    return code;
×
339
  }
340
  *ppChild1 = NULL;
34,503✔
341
  
342
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
34,503✔
343
  if (NULL == pJoin) {
34,503!
344
    code = terrno;
×
345
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
346
    *ppRes = NULL;
×
347
    return code;
×
348
  }
349

350
  pJoin->initDownstream = initParam;
34,503✔
351
  
352
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
34,503✔
353
  (*ppRes)->value = pJoin;
34,503✔
354

355
  return TSDB_CODE_SUCCESS;
34,503✔
356
}
357

358

359
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
360
  int32_t code = TSDB_CODE_SUCCESS;
×
361
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
362
  if (NULL == *ppRes) {
×
363
    code = terrno;
×
364
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
365
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
366
    return code;
×
367
  }
368
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
369
  if (NULL == *ppRes) {
×
370
    code = terrno;
×
371
    taosMemoryFreeClear(*ppRes);
×
372
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
373
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
374
    return code;
×
375
  }
376
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
377
    code = terrno;
×
378
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
379
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
380
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
381
    *ppRes = NULL;
×
382
    return code;
×
383
  }
384
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
385
    code = terrno;
×
386
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
387
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
388
    *ppRes = NULL;
×
389
    return code;
×
390
  }
391
  
392
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
393
  (*ppRes)->value = NULL;
×
394

395
  return TSDB_CODE_SUCCESS;
×
396
}
397

398

399

400
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
2✔
401
  int32_t code = TSDB_CODE_SUCCESS;
2✔
402
  int32_t vgNum = tSimpleHashGetSize(pVg);
2✔
403
  if (vgNum <= 0 || vgNum > 1) {
2!
404
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
405
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
406
  }
407

408
  int32_t iter = 0;
2✔
409
  void* p = NULL;
2✔
410
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
4✔
411
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
2✔
412
    SArray* pUidList = *(SArray**)p;
2✔
413

414
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
2✔
415
    if (code) {
2!
416
      return code;
×
417
    }
418
    taosArrayDestroy(pUidList);
2✔
419
    *(SArray**)p = NULL;
2✔
420
  }
421
  
422
  return TSDB_CODE_SUCCESS;
2✔
423
}
424

425

426
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
427
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
428
  if (NULL == pUidList) {
×
429
    return terrno;
×
430
  }
431
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
432
    return terrno;
×
433
  }
434

435
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
436
  taosArrayDestroy(pUidList);
×
437
  if (code) {
×
438
    return code;
×
439
  }
440
  
441
  return TSDB_CODE_SUCCESS;
×
442
}
443

444
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
34,503✔
445
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
34,503✔
446
  SOperatorParam*             pSrcParam0 = NULL;
34,503✔
447
  SOperatorParam*             pSrcParam1 = NULL;
34,503✔
448
  SOperatorParam*             pGcParam0 = NULL;
34,503✔
449
  SOperatorParam*             pGcParam1 = NULL;  
34,503✔
450
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
34,503✔
451
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
34,503✔
452
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
34,503✔
453
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
34,503✔
454
  int32_t                     code = TSDB_CODE_SUCCESS;
34,503✔
455

456
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
34,503✔
457
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
458

459
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
34,503!
460
  
461
  if (pInfo->stbJoin.basic.batchFetch) {
34,503✔
462
    if (pPrev->leftHash) {
34,479✔
463
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
300✔
464
      if (TSDB_CODE_SUCCESS == code) {
300!
465
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
300!
466
      }
467
      if (TSDB_CODE_SUCCESS == code) {
300!
468
        tSimpleHashCleanup(pPrev->leftHash);
300✔
469
        tSimpleHashCleanup(pPrev->rightHash);
300✔
470
        pPrev->leftHash = NULL;
300✔
471
        pPrev->rightHash = NULL;
300✔
472
      }
473
    }
474
  } else {
475
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
24!
476
    if (TSDB_CODE_SUCCESS == code) {
24!
477
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
24!
478
    }
479
  }
480

481
  bool initParam = pSrcParam0 ? true : false;
34,503✔
482
  if (TSDB_CODE_SUCCESS == code) {
34,503!
483
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
34,503✔
484
    pSrcParam0 = NULL;
34,503✔
485
  }
486
  if (TSDB_CODE_SUCCESS == code) {
34,503!
487
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
34,503✔
488
    pSrcParam1 = NULL;
34,503✔
489
  }
490
  if (TSDB_CODE_SUCCESS == code) {
34,503!
491
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
34,503✔
492
  }
493
  if (TSDB_CODE_SUCCESS != code) {
34,503!
UNCOV
494
    if (pSrcParam0) {
×
UNCOV
495
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
496
    }
UNCOV
497
    if (pSrcParam1) {
×
498
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
499
    }
UNCOV
500
    if (pGcParam0) {
×
UNCOV
501
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
502
    }
UNCOV
503
    if (pGcParam1) {
×
UNCOV
504
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
505
    }
UNCOV
506
    if (*ppParam) {
×
507
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
508
      *ppParam = NULL;
×
509
    }
510
  }
511
  
512
  return code;
34,503✔
513
}
514

515

516
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
34,503✔
517
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
34,503✔
518
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
34,503✔
519
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
34,503✔
520
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
34,503✔
521
  SOperatorParam*            pParam = NULL;
34,503✔
522
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
34,503✔
523
  if (TSDB_CODE_SUCCESS != code) {
34,503!
UNCOV
524
    pOperator->pTaskInfo->code = code;
×
UNCOV
525
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
526
  }
527

528
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
34,503✔
529
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
34,503✔
530
  if (*ppRes && (code == 0)) {
34,503!
531
    code = blockDataCheck(*ppRes);
698✔
532
    if (code) {
698!
533
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
534
      pOperator->pTaskInfo->code = code;
×
535
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
536
    }
537
    pPost->isStarted = true;
698✔
538
    pStbJoin->execInfo.postBlkNum++;
698✔
539
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
698✔
540
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
698✔
541
  } else {
542
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
33,805✔
543
  }
544
}
34,503✔
545

546

547
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
548
  SOperatorParam* pGcParam = NULL;
×
549
  SOperatorParam* pMergeJoinParam = NULL;
×
550
  int32_t         downstreamId = leftTable ? 0 : 1;
×
551
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
552
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
553

554
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
555

556
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
557
  if (TSDB_CODE_SUCCESS != code) {
×
558
    return code;
×
559
  }
560
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
561
  if (TSDB_CODE_SUCCESS != code) {
×
562
    return code;
×
563
  }
564

565
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
566
}
567

568
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
34,502✔
569
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
34,502✔
570
  int32_t code = 0;
34,502✔
571
  
572
  pPost->isStarted = false;
34,502✔
573
  
574
  if (pStbJoin->basic.batchFetch) {
34,502✔
575
    return TSDB_CODE_SUCCESS;
34,478✔
576
  }
577
  
578
  if (pPost->leftNeedCache) {
24!
579
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
580
    if (num && --(*num) <= 0) {
×
581
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
582
      if (code) {
×
583
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
584
        QRY_ERR_RET(code);
×
585
      }
586
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
587
    }
588
  }
589
  
590
  if (!pPost->rightNeedCache) {
24!
591
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
24✔
592
    if (NULL != v) {
24!
593
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
594
      if (code) {
×
595
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
596
        QRY_ERR_RET(code);
×
597
      }
598
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
599
    }
600
  }
601

602
  return TSDB_CODE_SUCCESS;
24✔
603
}
604

605

606
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
607
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,009✔
608
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
1,009✔
609
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
1,009✔
610

611
  if (!pPost->isStarted) {
1,009✔
612
    return TSDB_CODE_SUCCESS;
312✔
613
  }
614
  
615
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
697✔
616
  
617
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
697✔
618
  if (NULL == *ppRes) {
697!
619
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
697!
620
    pPrev->pListHead->readIdx++;
697✔
621
  } else {
622
    pInfo->stbJoin.execInfo.postBlkNum++;
×
623
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
624
  }
625

626
  return TSDB_CODE_SUCCESS;
697✔
627
}
628

629
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
630
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
68,966✔
631
  if (NULL == ppArray) {
68,966✔
632
    SArray* pArray = taosArrayInit(10, valSize);
1,000✔
633
    if (NULL == pArray) {
1,000!
634
      return terrno;
×
635
    }
636
    if (NULL == taosArrayPush(pArray, pVal)) {
2,000!
637
      taosArrayDestroy(pArray);
×
638
      return terrno;
×
639
    }
640
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
1,000!
641
      taosArrayDestroy(pArray);      
×
642
      return terrno;
×
643
    }
644
    return TSDB_CODE_SUCCESS;
1,000✔
645
  }
646

647
  if (NULL == taosArrayPush(*ppArray, pVal)) {
135,932!
648
    return terrno;
×
649
  }
650
  
651
  return TSDB_CODE_SUCCESS;
67,966✔
652
}
653

654
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
655
  int32_t code = TSDB_CODE_SUCCESS;
24✔
656
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
24✔
657
  if (NULL == pNum) {
24!
658
    uint32_t n = 1;
24✔
659
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
24✔
660
    if (code) {
24!
661
      return code;
×
662
    }
663
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
24✔
664
    if (code) {
24!
665
      return code;
×
666
    }
667
    return TSDB_CODE_SUCCESS;
24✔
668
  }
669

670
  switch (*pNum) {
×
671
    case 0:
×
672
      break;
×
673
    case UINT32_MAX:
×
674
      *pNum = 0;
×
675
      break;
×
676
    default:
×
677
      if (1 == (*pNum)) {
×
678
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
679
        if (code) {
×
680
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
681
          QRY_ERR_RET(code);
×
682
        }
683
      }
684
      (*pNum)++;
×
685
      break;
×
686
  }
687
  
688
  return TSDB_CODE_SUCCESS;
×
689
}
690

691

692
static void freeStbJoinTableList(SStbJoinTableList* pList) {
311✔
693
  if (NULL == pList) {
311!
694
    return;
×
695
  }
696
  taosMemoryFree(pList->pLeftVg);
311✔
697
  taosMemoryFree(pList->pLeftUid);
311✔
698
  taosMemoryFree(pList->pRightVg);
311✔
699
  taosMemoryFree(pList->pRightUid);
311✔
700
  taosMemoryFree(pList);
311✔
701
}
702

703
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
312✔
704
  int32_t code = TSDB_CODE_SUCCESS;
312✔
705
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
312✔
706
  if (NULL == pNew) {
312!
707
    return terrno;
×
708
  }
709
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
312✔
710
  if (NULL == pNew->pLeftVg) {
312!
711
    code = terrno;
×
712
    freeStbJoinTableList(pNew);
×
713
    return code;
×
714
  }
715
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
312✔
716
  if (NULL == pNew->pLeftUid) {
312!
717
    code = terrno;
×
718
    freeStbJoinTableList(pNew);
×
719
    return code;
×
720
  }
721
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
312✔
722
  if (NULL == pNew->pRightVg) {
312!
723
    code = terrno;
×
724
    freeStbJoinTableList(pNew);
×
725
    return code;
×
726
  }
727
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
312✔
728
  if (NULL == pNew->pRightUid) {
312!
729
    code = terrno;
×
730
    freeStbJoinTableList(pNew);
×
731
    return code;
×
732
  }
733

734
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
312✔
735
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
312✔
736
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
312✔
737
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
312✔
738

739
  pNew->readIdx = 0;
312✔
740
  pNew->uidNum = rows;
312✔
741
  pNew->pNext = NULL;
312✔
742
  
743
  if (pCtx->pListTail) {
312!
744
    pCtx->pListTail->pNext = pNew;
×
745
    pCtx->pListTail = pNew;
×
746
  } else {
747
    pCtx->pListHead = pNew;
312✔
748
    pCtx->pListTail= pNew;
312✔
749
  }
750

751
  return TSDB_CODE_SUCCESS;
312✔
752
}
753

754
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
312✔
755
  int32_t                    code = TSDB_CODE_SUCCESS;
312✔
756
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
312✔
757
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
312✔
758
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
312✔
759
  if (NULL == pVg0) {
312!
760
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
761
  }
762
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
312✔
763
  if (NULL == pVg1) {
312!
764
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
765
  }
766
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
312✔
767
  if (NULL == pUid0) {
312!
768
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
769
  }
770
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
312✔
771
  if (NULL == pUid1) {
312!
772
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
773
  }
774

775
  if (pStbJoin->basic.batchFetch) {
312✔
776
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
34,783✔
777
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
34,483✔
778
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
34,483✔
779
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
34,483✔
780
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
34,483✔
781

782
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
34,483✔
783
      if (TSDB_CODE_SUCCESS != code) {
34,483!
784
        break;
×
785
      }
786
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
34,483✔
787
      if (TSDB_CODE_SUCCESS != code) {
34,483!
788
        break;
×
789
      }
790
    }
791
  } else {
792
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
36✔
793
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
24✔
794
    
795
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
24✔
796
      if (TSDB_CODE_SUCCESS != code) {
24!
797
        break;
×
798
      }
799
    }
800
  }
801

802
  if (TSDB_CODE_SUCCESS == code) {
312!
803
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
312✔
804
    if (TSDB_CODE_SUCCESS == code) {
312!
805
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
312✔
806
    }
807
  }
808

809
_return:
×
810

811
  if (TSDB_CODE_SUCCESS != code) {
312!
812
    pOperator->pTaskInfo->code = code;
×
813
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
814
  }
815
}
312✔
816

817

818
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
11,002✔
819
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,002✔
820
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,002✔
821

822
  if (pStbJoin->basic.batchFetch) {
11,002✔
823
    return;
11,002✔
824
  }
825

826
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
12!
827
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
12✔
828
    return;
12✔
829
  }
830

831
  uint64_t* pUid = NULL;
×
832
  int32_t iter = 0;
×
833
  int32_t code = 0;
×
834
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
835
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
836
    if (code) {
×
837
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
838
    }
839
  }
840

841
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
842
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
843

844
/*
845
  // debug only
846
  iter = 0;
847
  uint32_t* num = NULL;
848
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
849
    A S S E R T(*num > 1);
850
  }
851
*/  
852
}
853

854
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
11,002✔
855
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,002✔
856
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,002✔
857

858
  while (true) {
312✔
859
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
11,314✔
860
    if (NULL == pBlock) {
11,314✔
861
      break;
11,002✔
862
    }
863

864
    pStbJoin->execInfo.prevBlkNum++;
312✔
865
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
312✔
866
    
867
    doBuildStbJoinTableHash(pOperator, pBlock);
312✔
868
  }
869

870
  postProcessStbJoinTableHash(pOperator);
11,002✔
871

872
  pStbJoin->ctx.prev.joinBuild = true;
11,002✔
873
}
11,002✔
874

875
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,009✔
876
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,009✔
877
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,009✔
878
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,009✔
879
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,009✔
880

881
  while (pNode) {
35,125✔
882
    if (pNode->readIdx >= pNode->uidNum) {
34,814✔
883
      pPrev->pListHead = pNode->pNext;
311✔
884
      freeStbJoinTableList(pNode);
311✔
885
      pNode = pPrev->pListHead;
311✔
886
      continue;
311✔
887
    }
888
    
889
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
34,503✔
890
    if (*ppRes) {
34,503✔
891
      return TSDB_CODE_SUCCESS;
698✔
892
    }
893

894
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
33,805!
895
    pPrev->pListHead->readIdx++;
33,805✔
896
  }
897

898
  *ppRes = NULL;
311✔
899
  setOperatorCompleted(pOperator);
311✔
900

901
  return TSDB_CODE_SUCCESS;
311✔
902
}
903

904
static FORCE_INLINE void seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
905
  if (pBlock != NULL) {
11,699✔
906
    pBlock->info.id.blockId = pStbJoin->outputBlkId;
698✔
907
  }
908
}
11,699✔
909

910
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
11,988✔
911
  int32_t                    code = TSDB_CODE_SUCCESS;
11,988✔
912
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,988✔
913
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,988✔
914

915
  QRY_PARAM_CHECK(pRes);
11,988!
916
  if (pOperator->status == OP_EXEC_DONE) {
11,988✔
917
    return code;
289✔
918
  }
919

920
  int64_t st = 0;
11,699✔
921
  if (pOperator->cost.openCost == 0) {
11,699✔
922
    st = taosGetTimestampUs();
11,002✔
923
  }
924

925
  if (!pStbJoin->ctx.prev.joinBuild) {
11,699✔
926
    buildStbJoinTableList(pOperator);
11,002✔
927
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
11,002✔
928
      setOperatorCompleted(pOperator);
10,690✔
929
      goto _return;
10,690✔
930
    }
931
  }
932

933
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
1,009!
934
  if (*pRes) {
1,009!
935
    goto _return;
×
936
  }
937

938
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
1,009!
939

940
_return:
1,009✔
941
  if (pOperator->cost.openCost == 0) {
11,699✔
942
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
11,002✔
943
  }
944

945
  if (code) {
11,699!
946
    qError("%s failed since %s", __func__, tstrerror(code));
×
947
    pOperator->pTaskInfo->code = code;
×
948
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
949
  } else {
950
    seqStableJoinComposeRes(pStbJoin, *pRes);
11,699✔
951
  }
952
  return code;
11,699✔
953
}
954

955
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
11,002✔
956
  if (batchFetch) {
11,002✔
957
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
10,990✔
958
    if (NULL == pPrev->leftHash) {
10,990!
UNCOV
959
      return terrno;
×
960
    }
961
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
10,990✔
962
    if (NULL == pPrev->rightHash) {
10,990!
963
      return terrno;
×
964
    }
965
  } else {
966
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
967
    if (NULL == pPrev->leftCache) {
12!
968
      return terrno;
×
969
    }
970
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
971
    if (NULL == pPrev->rightCache) {
12!
972
      return terrno;
×
973
    }
974
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
975
    if (NULL == pPrev->onceTable) {
12!
976
      return terrno;
×
977
    }
978
  }
979

980
  return TSDB_CODE_SUCCESS;
11,002✔
981
}
982

983
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
11,002✔
984
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
985
                                       SOperatorInfo** pOptrInfo) {
986
  QRY_PARAM_CHECK(pOptrInfo);
11,002!
987

988
  int32_t                    code = TSDB_CODE_SUCCESS;
11,002✔
989
  __optr_fn_t                nextFp = NULL;
11,002✔
990
  SOperatorInfo*             pOperator = NULL;
11,002✔
991
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
11,002✔
992
  if (pInfo == NULL) {
11,002!
993
    code = terrno;
×
994
    goto _error;
×
995
  }
996

997
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
11,002✔
998
  if (pOperator == NULL) {
11,002!
999
    code = terrno;
×
1000
    goto _error;
×
1001
  }
1002

1003
  pTaskInfo->dynamicTask = pPhyciNode->node.dynamicOp;
11,002✔
1004

1005
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
11,002✔
1006
  if (TSDB_CODE_SUCCESS != code) {
11,002!
1007
    goto _error;
×
1008
  }
1009

1010
  pInfo->qType = pPhyciNode->qType;
11,002✔
1011
  switch (pInfo->qType) {
11,002!
1012
    case DYN_QTYPE_STB_HASH:
11,002✔
1013
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
11,002✔
1014
      pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId;
11,002✔
1015
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
11,002✔
1016
      if (TSDB_CODE_SUCCESS != code) {
11,002!
UNCOV
1017
        goto _error;
×
1018
      }
1019
      nextFp = seqStableJoin;
11,002✔
1020
      break;
11,002✔
1021
    default:
×
1022
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1023
      code = TSDB_CODE_INVALID_PARA;
×
1024
      goto _error;
×
1025
  }
1026

1027
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
11,002✔
1028
                  pInfo, pTaskInfo);
1029

1030
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
11,002✔
1031
                                         NULL, optrDefaultGetNextExtFn, NULL);
1032

1033
  *pOptrInfo = pOperator;
11,002✔
1034
  return TSDB_CODE_SUCCESS;
11,002✔
1035

UNCOV
1036
_error:
×
UNCOV
1037
  if (pInfo != NULL) {
×
UNCOV
1038
    destroyDynQueryCtrlOperator(pInfo);
×
1039
  }
1040

UNCOV
1041
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
UNCOV
1042
  pTaskInfo->code = code;
×
UNCOV
1043
  return code;
×
1044
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc