• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.65
/source/libs/executor/src/groupcacheoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "groupcache.h"
29

30

31
static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) {
553✔
32
  if (pFileInfo->fd.fd) {
553!
33
    if (taosCloseFile(&pFileInfo->fd.fd) < 0) {
553!
34
      qError("close group cache file failed, fd:%p, error:%s", pFileInfo->fd.fd, tstrerror(terrno));
×
35
    }
36
    pFileInfo->fd.fd = NULL;
553✔
37
    (void)taosThreadMutexDestroy(&pFileInfo->fd.mutex);
553✔
38
  }
39
  pFileInfo->deleted = true;
553✔
40
}
553✔
41

42

43
static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) {
×
44
  pCols->colNum = LIST_LENGTH(pList);
×
45
  pCols->withNull = grpColsMayBeNull;  
×
46
  pCols->pColsInfo = taosMemoryMalloc(pCols->colNum * sizeof(SGroupColInfo));
×
47
  if (NULL == pCols->pColsInfo) {
×
48
    return terrno;
×
49
  }
50

51
  int32_t i = 0;
×
52
  SNode* pNode = NULL;
×
53
  FOREACH(pNode, pList) {
×
54
    SColumnNode* pColNode = (SColumnNode*)pNode;
×
55
    pCols->pColsInfo[i].slot = pColNode->slotId;
×
56
    pCols->pColsInfo[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
×
57
    pCols->pColsInfo[i].bytes = pColNode->node.resType.bytes;
×
58
    pCols->bufSize += pColNode->node.resType.bytes;
×
59
    ++i;
×
60
  }  
61

62
  if (pCols->withNull) {
×
63
    pCols->bitMapSize = pCols->colNum / sizeof(int8_t) + ((pCols->colNum % sizeof(int8_t)) ? 1 : 0);
×
64
    pCols->bufSize += pCols->bitMapSize;
×
65
  }
66

67
  if (pCols->colNum > 1) {
×
68
    pCols->pBuf = taosMemoryMalloc(pCols->bufSize);
×
69
    if (NULL == pCols->pBuf) {
×
70
      return terrno;
×
71
    }
72
  }
73

74
  return TSDB_CODE_SUCCESS;
×
75
}
76

77
static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
11,035✔
78
  if (pGrpCacheOperator->downstreamNum <= 0 || NULL == pGrpCacheOperator->execInfo.pDownstreamBlkNum) {
11,035!
79
    return;
×
80
  }
81

82
  int32_t bufSize = pGrpCacheOperator->downstreamNum * 32 + 100;
11,035✔
83
  char* buf = taosMemoryMalloc(bufSize);
11,035!
84
  if (NULL == buf) {
11,035!
85
    return;
×
86
  }
87
  int32_t offset = tsnprintf(buf, bufSize, "groupCache exec info, downstreamBlkNum:");
11,035✔
88
  for (int32_t i = 0; i < pGrpCacheOperator->downstreamNum; ++i) {
33,105✔
89
    offset += tsnprintf(buf + offset, bufSize, " %" PRId64 , pGrpCacheOperator->execInfo.pDownstreamBlkNum[i]);
22,070✔
90
  }
91
  qDebug("%s", buf);
11,035✔
92
  taosMemoryFree(buf);
11,035!
93
}
94

95
static void freeSGcSessionCtx(void* p) {
41,886✔
96
  SGcSessionCtx* pSession = p;
41,886✔
97
  if (pSession->semInit) {
41,886!
98
    if (tsem_destroy(&pSession->waitSem) < 0) {
×
99
      qError("tsem_destroy session waitSem failed, error:%s", tstrerror(terrno));
×
100
    }
101
  }
102
}
41,886✔
103

104
static void freeSGroupCacheFileInfo(void* p) {
553✔
105
  SGroupCacheFileInfo* pFileInfo = p;
553✔
106
  if (pFileInfo->deleted) {
553!
107
    return;
×
108
  }
109

110
  removeGroupCacheFile(pFileInfo);
553✔
111
}
112

113
static void freeSGcFileCacheCtx(SGcFileCacheCtx* pFileCtx) {
44,140✔
114
  taosHashCleanup(pFileCtx->pCacheFile);
44,140✔
115
}
44,140✔
116

117
static void freeSGcVgroupCtx(void* p) {
22,070✔
118
  SGcVgroupCtx* pVgCtx = p;
22,070✔
119
  taosArrayDestroy(pVgCtx->pTbList);
22,070✔
120
  freeSGcFileCacheCtx(&pVgCtx->fileCtx);
22,070✔
121
}
22,070✔
122

123
static void freeGcBlockInList(void* p) {
20,737✔
124
  SSDataBlock** ppBlock = p;
20,737✔
125
  if (*ppBlock) {
20,737!
126
    taosArrayDestroy((*ppBlock)->pDataBlock);
20,737✔
127
    taosMemoryFree(*ppBlock);
20,737!
128
  }
129
}
20,737✔
130

131
static void freeSGcDownstreamCtx(SGcDownstreamCtx* pCtx) {
22,070✔
132
  taosArrayDestroy(pCtx->pNewGrpList);
22,070✔
133
  taosHashCleanup(pCtx->pGrpHash);
22,070✔
134
  tSimpleHashCleanup(pCtx->pVgTbHash);
22,070✔
135

136
  taosArrayDestroyEx(pCtx->pFreeBlock, freeGcBlockInList);
22,070✔
137
  taosHashCleanup(pCtx->pSessions);
22,070✔
138
  taosHashCleanup(pCtx->pWaitSessions);
22,070✔
139
  freeSGcFileCacheCtx(&pCtx->fileCtx);
22,070✔
140
}
22,070✔
141

142
static void destroyGroupCacheDownstreamCtx(SGroupCacheOperatorInfo* pGrpCacheOperator) {
11,035✔
143
  if (NULL == pGrpCacheOperator->pDownstreams) {
11,035!
144
    return;
×
145
  }
146
  
147
  for (int32_t i = 0; i < pGrpCacheOperator->downstreamNum; ++i) {
33,105✔
148
    SGcDownstreamCtx* pCtx = &pGrpCacheOperator->pDownstreams[i];
22,070✔
149
    freeSGcDownstreamCtx(pCtx);
22,070✔
150
  }
151

152
  taosMemoryFree(pGrpCacheOperator->pDownstreams);
11,035!
153
}
154

155

156
void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
41,179✔
157
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
41,179✔
158
  for (int32_t i = 0; i < numOfCols; ++i) {
155,397✔
159
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
114,218✔
160
    if (NULL == p) {
114,218!
161
      qError("fail to get %dth col in dataBlock, numOfCols:%d", i, (int32_t)numOfCols);
×
162
      continue;
×
163
    }
164
    taosMemoryFreeClear(p->pData);
114,218!
165
    if (IS_VAR_DATA_TYPE(p->info.type)) {
114,218!
166
      taosMemoryFreeClear(p->varmeta.offset);
450!
167
      p->varmeta.length = 0;
450✔
168
      p->varmeta.allocLen = 0;
450✔
169
    } else {
170
      taosMemoryFreeClear(p->nullbitmap);
113,768!
171
    }
172
  }
173
  pDataBlock->info.capacity = 0;
41,179✔
174
  pDataBlock->info.rows = 0;
41,179✔
175
}
41,179✔
176

177

178

179
static void destroySGcBlkCacheInfo(SGcBlkCacheInfo* pBlkCache) {
11,035✔
180
  taosHashCleanup(pBlkCache->pDirtyBlk);
11,035✔
181

182
  void* p = NULL;
11,035✔
183
  while (NULL != (p = taosHashIterate(pBlkCache->pReadBlk, p))) {
31,244✔
184
    blockDataDeepCleanup(*(SSDataBlock**)p);
20,209✔
185
    freeGcBlockInList(p);
20,209✔
186
  }
187

188
  taosHashCleanup(pBlkCache->pReadBlk);
11,035✔
189
}
11,035✔
190

191
static void destroyGroupCacheOperator(void* param) {
11,035✔
192
  SGroupCacheOperatorInfo* pGrpCacheOperator = (SGroupCacheOperatorInfo*)param;
11,035✔
193

194
  logGroupCacheExecInfo(pGrpCacheOperator);
11,035✔
195
  
196
  taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
11,035!
197
  taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
11,035!
198

199
  destroyGroupCacheDownstreamCtx(pGrpCacheOperator);
11,035✔
200
  destroySGcBlkCacheInfo(&pGrpCacheOperator->blkCache);
11,035✔
201
  taosHashCleanup(pGrpCacheOperator->pGrpHash);
11,035✔
202

203
  taosMemoryFree(pGrpCacheOperator->execInfo.pDownstreamBlkNum);
11,035!
204
  
205
  taosMemoryFreeClear(param);
11,035!
206
}
11,035✔
207

208
static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) {
209
  TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
553✔
210
  //TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE);
211
  if (NULL == newFd) {
553!
212
    QRY_ERR_RET(terrno);
×
213
  }
214
  pFileFd->fd = newFd;
553✔
215
  int32_t code = taosThreadMutexInit(&pFileFd->mutex, NULL);
553✔
216
  if (code) {
553!
217
    qError("taosThreadMutexInit failed, code:%x", code);
×
218
    QRY_ERR_RET(code);
×
219
  }
220

221
  qTrace("file path %s created", filename);
553!
222
  
223
  return TSDB_CODE_SUCCESS;
553✔
224
}
225

226
static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, SGroupCacheFileFd** ppFd, bool* pDeleted) {
43,748✔
227
  int32_t code = TSDB_CODE_SUCCESS;
43,748✔
228
  if (NULL == pFileCtx->pCacheFile) {
43,748✔
229
    pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
553✔
230
    if (NULL == pFileCtx->pCacheFile) {
553!
231
      return terrno;
×
232
    }
233
    taosHashSetFreeFp(pFileCtx->pCacheFile, freeSGroupCacheFileInfo);
553✔
234
  }
235
  
236
  SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
43,748✔
237
  if (NULL == pTmp) {
43,748✔
238
    (void)snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", fileId);
553✔
239

240
    SGroupCacheFileInfo newFile = {0};
553✔
241
    if (taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile))) {
553!
242
      QRY_ERR_RET(terrno);
×
243
    }
244
    pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
553✔
245
    if (NULL == pTmp) {
553!
246
      qError("fail to get file %d from pCacheFile", fileId);
×
247
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
248
    }
249
  }
250

251
  if (pTmp->deleted) {
43,748!
252
    *pDeleted = true;
×
253
    return TSDB_CODE_SUCCESS;
×
254
  }
255

256
  if (NULL == pTmp->fd.fd) {
43,748✔
257
    code = initOpenCacheFile(&pTmp->fd, pFileCtx->baseFilename);
553✔
258
    if (code) {
553!
259
      return code;
×
260
    }
261
  }
262

263
  (void)taosThreadMutexLock(&pTmp->fd.mutex);
43,748✔
264
  *ppFd = &pTmp->fd;
43,748✔
265
  
266
  return TSDB_CODE_SUCCESS;
43,748✔
267
}
268

269
static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
270
  if (NULL == pFd) {
43,748✔
271
    return;
×
272
  }
273
  (void)taosThreadMutexUnlock(&pFd->mutex);
43,748✔
274
}
275

276
static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
2,569✔
277
  int32_t code = TSDB_CODE_SUCCESS;
2,569✔
278
  SGroupCacheFileFd *pFd = NULL;
2,569✔
279
  SGcFileCacheCtx* pFileCtx = NULL;
2,569✔
280
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
2,569!
281
  int64_t lastGroupId = 0;
2,569✔
282
  SGroupCacheData* pGroup = NULL;
2,569✔
283
  
284
  while (NULL != pHead) {
5,138✔
285
    pFd = NULL;
2,569✔
286
    
287
    if (pGCache->batchFetch) {
2,569!
288
      pFileCtx = &pHead->pCtx->fileCtx;
2,569✔
289
    } else {
290
      if (pHead->groupId != lastGroupId) {
×
291
        if (NULL != pGroup) {
×
292
          taosHashRelease(pGrpHash, pGroup);
×
293
        }
294
        pGroup = taosHashAcquire(pGrpHash, &pHead->groupId, sizeof(pHead->groupId));      
×
295
        lastGroupId = pHead->groupId;
×
296
      }
297
    
298
      if (NULL == pGroup) {
×
299
        qTrace("group %" PRIu64 " in downstream %d may already be deleted, skip write", pHead->groupId, pHead->pCtx->id);
×
300

301
        int64_t blkId = pHead->basic.blkId;
×
302
        pHead = pHead->next;
×
303
        code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
×
304
        if (code) {
×
305
          qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
306
          goto _return;
×
307
        }
308
        continue;
×
309
      }
310
      
311
      pFileCtx = &pGroup->pVgCtx->fileCtx;
×
312
    }
313

314
    bool deleted = false;
2,569✔
315
    code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd, &deleted);
2,569✔
316
    if (code) {
2,569!
317
      goto _return;
×
318
    }
319

320
    if (deleted) {
2,569!
321
      releaseFdToFileCtx(pFd);
×
322

323
      qTrace("FileId:%d-%d-%d already be deleted, skip write", 
×
324
          pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);
325
      
326
      int64_t blkId = pHead->basic.blkId;
×
327
      pHead = pHead->next;
×
328
      
329
      code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
×
330
      if (code) {
×
331
        qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
332
        goto _return;
×
333
      }
334
      continue;
×
335
    }
336
    
337
    int64_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
2,569✔
338
    if (ret < 0) {
2,569!
339
      releaseFdToFileCtx(pFd);
×
340
      code = terrno;
×
341
      goto _return;
×
342
    }
343
    
344
    ret = taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
2,569✔
345
    if (ret != pHead->basic.bufSize) {
2,569!
346
      releaseFdToFileCtx(pFd);
×
347
      code = terrno;
×
348
      goto _return;
×
349
    }
350
    
351
    releaseFdToFileCtx(pFd);
2,569!
352

353
    qTrace("FileId:%d-%d-%d blk %" PRIu64 " in group %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, 
2,569!
354
        pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset);
355
    
356
    int64_t blkId = pHead->basic.blkId;
2,569✔
357
    pHead = pHead->next;
2,569✔
358

359
    code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
2,569✔
360
    if (code) {
2,569!
361
      qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
362
      goto _return;
×
363
    }
364
  }
365

366
_return:
2,569✔
367

368
  if (NULL != pGroup) {
2,569!
369
    taosHashRelease(pGrpHash, pGroup);
×
370
  }
371

372
  (void)atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1);
2,569✔
373

374
  return code;
2,569✔
375
}
376

377

378
void freeGcBlkBufInfo(void* ptr) {
2,569✔
379
  SGcBlkBufInfo* pBlk = (SGcBlkBufInfo*)ptr;
2,569✔
380
  taosMemoryFreeClear(pBlk->pBuf);
2,569!
381
}
2,569✔
382

383

384
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) {
2,569✔
385
  if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId), pBufInfo, sizeof(*pBufInfo))) {
2,569!
386
    freeGcBlkBufInfo(pBufInfo);
×
387
    return terrno;
×
388
  }
389
  pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));
2,569✔
390
  if (NULL == pBufInfo) {
2,569!
391
    qError("fail to get blk %" PRId64 " from pCache->pDirtyBlk", pBufInfo->basic.blkId);
×
392
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
393
  }
394
  int32_t code = TSDB_CODE_SUCCESS;
2,569✔
395
  SGcBlkBufInfo* pWriteHead = NULL;
2,569✔
396
  
397
  taosWLockLatch(&pCache->dirtyLock);
2,569✔
398
  pCache->blkCacheSize += pBufInfo->basic.bufSize;
2,569✔
399
  qDebug("group cache total dirty block num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize);
2,569✔
400

401
  if (NULL == pCache->pDirtyHead) {
2,569!
402
    pCache->pDirtyHead = pBufInfo;
2,569✔
403
  } else {
404
    pCache->pDirtyTail->next = pBufInfo;
×
405
  }
406
  pCache->pDirtyTail = pBufInfo;
2,569✔
407
    
408
  if (pGCache->maxCacheSize >= 0 && pCache->blkCacheSize > pGCache->maxCacheSize) {
2,569!
409
    if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) {
2,569!
410
      pWriteHead = pCache->pDirtyHead;
2,569✔
411
      SGcBlkBufInfo* pTmp = pCache->pDirtyHead;
2,569✔
412
      while (NULL != pTmp) {
2,569!
413
        pCache->blkCacheSize -= pTmp->basic.bufSize;
2,569✔
414
        if (pCache->blkCacheSize <= pGCache->maxCacheSize) {
2,569!
415
          pCache->pDirtyHead = pTmp->next;
2,569✔
416
          pTmp->next = NULL;
2,569✔
417
          break;
2,569✔
418
        }
419
        pTmp = pTmp->next;
×
420
      }
421
    }
422
  }
423
  taosWUnLockLatch(&pCache->dirtyLock);
2,569✔
424

425
  if (NULL != pWriteHead) {
2,569!
426
    code = saveBlocksToDisk(pGCache, pCtx, pWriteHead);
2,569✔
427
  }
428

429
  return code;
2,569✔
430
}
431

432
static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) {
433
  SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
×
434
  if (NULL == pFileInfo) {
×
435
    return;
×
436
  }
437
  
438
  if (0 == pFileInfo->groupNum) {
×
439
    removeGroupCacheFile(pFileInfo);
×
440

441
#if 0  
442
    /* debug only */
443
    snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", pFileCtx->fileId);
444
    taosRemoveFile(pFileCtx->baseFilename);
445
    /* debug only */
446
#endif
447

448
    qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId);
×
449
    //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
450
  }
451
}
452

453
static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId, bool removeCheck) {
454
  if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
2,617✔
455
    return;
2,617✔
456
  }
457

458
  if (removeCheck) {
×
459
    chkRemoveVgroupCurrFile(pFileCtx, downstreamIdx, vgId);
460
  }
461
      
462
  pFileCtx->fileId++;
×
463
  pFileCtx->fileSize = 0;
×
464
}
465

466

467
static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) {
2,569✔
468
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
2,569✔
469
  int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t);
2,569✔
470
  pBufInfo->pBuf = taosMemoryMalloc(bufSize);
2,569!
471
  if (NULL == pBufInfo->pBuf) {
2,569!
472
    qError("group cache add block to cache failed, size:%" PRId64, bufSize);
×
473
    return terrno;
×
474
  }
475
  QRY_ERR_RET(blockDataToBuf(pBufInfo->pBuf, pBlock));
2,569!
476

477
  SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pCtx->fileCtx : &pGroup->pVgCtx->fileCtx;
2,569!
478

479
  pBufInfo->next = NULL;
2,569✔
480
  pBufInfo->basic.blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1);
2,569✔
481
  pBufInfo->basic.fileId = pGCache->batchFetch ? pFileCtx->fileId : pGroup->fileId;
2,569!
482
  pBufInfo->basic.bufSize = bufSize;
2,569✔
483
  pBufInfo->basic.offset = atomic_fetch_add_64(&pFileCtx->fileSize, bufSize);
2,569✔
484
  pBufInfo->pCtx = pCtx;
2,569✔
485
  pBufInfo->groupId = pBlock->info.id.groupId;
2,569✔
486

487
  if (pGCache->batchFetch) {    
2,569!
488
    groupCacheSwitchNewFile(pFileCtx, pCtx->id, pGroup->vgId, false);
2,569!
489
  }
490

491
  int32_t code = addBlkToDirtyBufList(pGCache, pCtx, &pGCache->blkCache, pBufInfo);
2,569✔
492

493
  return code;
2,569✔
494
}
495

496
void blockDataDeepClear(SSDataBlock* pDataBlock) {
20,737✔
497
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
20,737✔
498
  for (int32_t i = 0; i < numOfCols; ++i) {
78,149✔
499
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
57,412✔
500
    if (NULL == p) {
57,412!
501
      qError("fail to get %d col from pDataBlock, numOfCols:%d", i, (int32_t)numOfCols);
×
502
      continue;
×
503
    }
504
    p->pData = NULL;
57,412✔
505
    if (IS_VAR_DATA_TYPE(p->info.type)) {
57,412!
506
      p->varmeta.offset = NULL;
347✔
507
      p->varmeta.length = 0;
347✔
508
      p->varmeta.allocLen = 0;
347✔
509
    } else {
510
      p->nullbitmap = NULL;
57,065✔
511
    }
512
  }
513
  pDataBlock->info.capacity = 0;
20,737✔
514
  pDataBlock->info.rows = 0;
20,737✔
515
}
20,737✔
516

517
static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) {
20,737✔
518
  *ppDst = taosMemoryMalloc(sizeof(*pSrc));
20,737!
519
  if (NULL == *ppDst) {
20,737!
520
    return terrno;
×
521
  }
522
  (*ppDst)->pBlockAgg = NULL;
20,737✔
523
  (*ppDst)->pDataBlock = taosArrayDup(pSrc->pDataBlock, NULL);
20,737✔
524
  if (NULL == (*ppDst)->pDataBlock) {
20,737!
525
    taosMemoryFree(*ppDst);
×
526
    return terrno;
×
527
  }
528
  TAOS_MEMCPY(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info));
20,737✔
529
  blockDataDeepClear(*ppDst);
20,737✔
530
  
531
  return TSDB_CODE_SUCCESS;
20,737✔
532
}
533

534
static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** ppRes) {
41,179✔
535
  taosWLockLatch(&pCtx->blkLock);
41,179✔
536
  if (taosArrayGetSize(pCtx->pFreeBlock) <= 0) {
41,179✔
537
    taosWUnLockLatch(&pCtx->blkLock);
20,160✔
538
    return buildGroupCacheBaseBlock(ppRes, pCtx->pBaseBlock);
20,160✔
539
  }
540
  *ppRes = *(SSDataBlock**)taosArrayPop(pCtx->pFreeBlock);
21,019✔
541
  taosWUnLockLatch(&pCtx->blkLock);
21,019✔
542

543
  return TSDB_CODE_SUCCESS;  
21,019✔
544
}
545

546
static int32_t releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) {
20,970✔
547
  int32_t code = TSDB_CODE_SUCCESS;
20,970✔
548
  
549
  blockDataDeepCleanup(pBlock);
20,970✔
550
  taosWLockLatch(&pCtx->blkLock);
20,970✔
551
  if (NULL == taosArrayPush(pCtx->pFreeBlock, &pBlock)) {
41,940!
552
    code = terrno;
×
553
  }
554
  taosWUnLockLatch(&pCtx->blkLock);
20,970✔
555

556
  return code;
20,970✔
557
}
558

559

560
static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, void* pBuf, SSDataBlock** ppRes) {
41,179✔
561
  int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes);
41,179✔
562
  if (code) {
41,179!
563
    return code;
×
564
  }
565
  //TODO OPTIMIZE PERF
566
  return blockDataFromBuf(*ppRes, pBuf);
41,179✔
567
}
568

569
static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) {
41,179✔
570
  SGroupCacheFileFd *pFileFd = NULL;
41,179✔
571
  SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pGCache->pDownstreams[pGrp->downstreamIdx].fileCtx : &pGrp->pVgCtx->fileCtx;
41,179!
572
  bool deleted = false;
41,179✔
573
  int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd, &deleted);
41,179✔
574
  if (code) {
41,179!
575
    return code;
×
576
  }
577
  if (deleted) {
41,179!
578
    qError("FileId:%d-%d-%d already be deleted, skip read", pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId);
×
579
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
580
  }
581
  
582
  int64_t ret = taosLSeekFile(pFileFd->fd, pBasic->offset, SEEK_SET);
41,179✔
583
  if (ret < 0) {
41,179!
584
    code = terrno;
×
585
    goto _return;
×
586
  }
587

588
  *ppBuf = taosMemoryMalloc(pBasic->bufSize);
41,179!
589
  if (NULL == *ppBuf) {
41,179!
590
    code = terrno;
×
591
    goto _return;
×
592
  }
593
  
594
  ret = taosReadFile(pFileFd->fd, *ppBuf, pBasic->bufSize);
41,179✔
595
  if (ret != pBasic->bufSize) {
41,179!
596
    taosMemoryFreeClear(*ppBuf);
×
597
    code = terrno;
×
598
    goto _return;
×
599
  }
600

601
  qTrace("FileId:%d-%d-%d blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64, 
41,179!
602
      pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset);
603

604
_return:
41,179✔
605

606
  releaseFdToFileCtx(pFileFd);
41,179!
607
  return code;
41,179✔
608
}
609

610
static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, SGcBlkBufBasic* pBasic, SSDataBlock** ppRes) {
41,179✔
611
  int32_t code = TSDB_CODE_SUCCESS;
41,179✔
612
  SGcBlkCacheInfo* pCache = &pGCache->blkCache;
41,179✔
613
  void* pBuf = NULL;
41,179✔
614

615
  SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &pBasic->blkId, sizeof(pBasic->blkId));
41,179✔
616
  if (NULL == pBufInfo) {
41,179!
617
    code = readBlockFromDisk(pGCache, pGrp, pBasic, &pBuf);
41,179✔
618
    if (code) {
41,179!
619
      return code;
×
620
    }
621
  } else {
622
    pBuf = pBufInfo->pBuf;
×
623
  }
624
  
625
  code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes);
41,179✔
626
  taosHashRelease(pCache->pDirtyBlk, pBufInfo);
41,179✔
627
  if (NULL == pBufInfo) {
41,179!
628
    taosMemoryFree(pBuf);
41,179!
629
  }
630
  
631
  if (code) {
41,179!
632
    return code;
×
633
  }
634

635
  QRY_ERR_RET(taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES));
41,179!
636
  
637
  return TSDB_CODE_SUCCESS;
41,179✔
638
}
639

640
static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
641
  pVgCtx->pTbList = pTbList;
22,070✔
642
  pVgCtx->id = vgId;
22,070✔
643
  (void)snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d", 
44,140✔
644
     tsTempDir, taosGetPId(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId);
22,070✔
645
  pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0;
22,070✔
646

647
  pVgCtx->fileCtx.baseNameLen = strlen(pVgCtx->fileCtx.baseFilename);
22,070✔
648
}
22,070✔
649

650
static int32_t addNewGroupToVgHash(SOperatorInfo* pOperator, SSHashObj* pHash, SGcNewGroupInfo* pNew) {
48✔
651
  SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx;
48✔
652
  if (NULL == pVgCtx) {
48✔
653
    SArray* pList = taosArrayInit(10, sizeof(*pNew));
24✔
654
    if (NULL == pList) {
24!
655
      return terrno;
×
656
    }
657
    if (NULL == taosArrayPush(pList, pNew)) {
24!
658
      QRY_ERR_RET(terrno);
×
659
    }
660
    
661
    SGcVgroupCtx vgCtx = {0};
24✔
662
    initGcVgroupCtx(pOperator, &vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList);
24✔
663
    QRY_ERR_RET(tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)));
24!
664
    
665
    pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId));
24✔
666
    if (NULL == pNew->pGroup->pVgCtx) {
24!
667
      qError("fail to get vg %d ctx from vgHash", pNew->vgId);
×
668
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
669
    }
670
    
671
    return TSDB_CODE_SUCCESS;
24✔
672
  }
673

674
  if (NULL == taosArrayPush(pVgCtx->pTbList, pNew)) {
48!
675
    QRY_ERR_RET(terrno);
×
676
  }
677
  
678
  return TSDB_CODE_SUCCESS;
24✔
679
}
680

681
static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SOperatorParam** ppParam) {
682
  int32_t code = TSDB_CODE_SUCCESS;
3,183✔
683
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
3,183✔
684
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx];  
3,183✔
685
  SOperatorParam* pDst = NULL;
3,183✔
686
  
687
  taosWLockLatch(&pCtx->grpLock);
3,183✔
688
  int32_t num = taosArrayGetSize(pCtx->pNewGrpList);
3,183✔
689
  if (num <= 0) {
3,183✔
690
    goto _return;
2,529✔
691
  }
692

693
  for (int32_t i = 0; i < num; ++i) {
1,308✔
694
    SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i);
654✔
695
    if (NULL == pNew) {
654!
696
      qError("fail to get vg %d SGcNewGroupInfo from pNewGrpList", i);
×
697
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
698
    }
699
    
700
    if (!pGCache->batchFetch) {
654✔
701
      code = addNewGroupToVgHash(pOperator, pCtx->pVgTbHash, pNew);
48✔
702
      if (code) {
48!
703
        goto _return;
×
704
      }
705
    }
706

707
    if (NULL == pDst) {
654!
708
      pDst = pNew->pParam;
654✔
709
    } else if (pNew->pParam) {
×
710
      code = mergeOperatorParams(pDst, pNew->pParam);
×
711
      if (code) {
×
712
        goto _return;
×
713
      }
714
    }
715
  }
716

717
  taosArrayClear(pCtx->pNewGrpList);
654✔
718
  
719
_return:
3,183✔
720

721
  taosWUnLockLatch(&pCtx->grpLock);
3,183✔
722
  *ppParam = pDst;
3,183✔
723
  
724
  return code;
3,183✔
725
}
726

727
static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx,
728
                                                         SSDataBlock** ppRes) {
729
  int32_t                  code = TSDB_CODE_SUCCESS;
3,183✔
730
  SOperatorParam*          pDownstreamParam = NULL;
3,183✔
731
  SSDataBlock*             pBlock = NULL;
3,183✔
732
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
3,183✔
733

734
  code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
3,183✔
735
  if (code) {
3,183!
736
    return code;
×
737
  }
738

739
  SOperatorInfo* pDownstream = pOperator->pDownstream[downstreamIdx];
3,183✔
740
  if (pDownstreamParam) {
3,183✔
741
    code = pDownstream->fpSet.getNextExtFn(pDownstream, pDownstreamParam, &pBlock);
654✔
742
  } else {
743
    code = pDownstream->fpSet.getNextFn(pDownstream, &pBlock);
2,529✔
744
  }
745

746
  if (code) {
3,183!
747
    qError("failed to get block from downstream, code:%s %s", tstrerror(code), GET_TASKID(pOperator->pTaskInfo));
×
748
    return code;
×
749
  }
750

751
  if (pBlock) {
3,183✔
752
    qDebug("%s res block retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId);
2,617✔
753

754
    pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
2,617✔
755
    if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
2,617✔
756
      code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
577✔
757
      if (code) {
577!
758
        return code;
×
759
      }
760

761
      if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock,
577!
762
                                &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) {
577✔
763
        QRY_ERR_RET(terrno);
×
764
      }
765
    }
766
  }
767

768
  code = blockDataCheck(pBlock);
3,183✔
769

770
  *ppRes = pBlock;
3,183✔
771
  return code;
3,183✔
772
}
773

774
static int32_t notifyWaitingSessions(SArray* pWaitQueue) {
5,049✔
775
  if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) {
5,049!
776
    return TSDB_CODE_SUCCESS;
5,049✔
777
  }
778
  
779
  int32_t n = taosArrayGetSize(pWaitQueue);
×
780
  for (int32_t i = 0; i < n; ++i) {
×
781
    SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i);
×
782
    if (NULL == pSession) {
×
783
      qError("fail to get %d SGcSessionCtx in pWaitQueue, total:%d", i, n);
×
784
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
785
    }
786
    
787
    QRY_ERR_RET(tsem_post(&pSession->waitSem));
×
788
  }
789

790
  return TSDB_CODE_SUCCESS;
×
791
}
792

793
static FORCE_INLINE int32_t handleGroupFetchDone(SGroupCacheData* pGroup) {
794
  int32_t code = TSDB_CODE_SUCCESS;
2,432✔
795
  pGroup->pBlock = NULL;
2,432✔
796
  atomic_store_8((int8_t*)&pGroup->fetchDone, true);
2,384✔
797
  
798
  (void)taosThreadMutexLock(&pGroup->mutex);
2,432✔
799
  code = notifyWaitingSessions(pGroup->waitQueue);
2,432✔
800
  taosArrayClear(pGroup->waitQueue);
2,432✔
801
  (void)taosThreadMutexUnlock(&pGroup->mutex);
2,432✔
802

803
  return code;
2,432✔
804
}
805

806
static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int32_t downstreamId, int32_t vgId) {
×
807
  if (NULL == pFileCtx->pCacheFile) {
×
808
    pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
809
    if (NULL == pFileCtx->pCacheFile) {
×
810
      return terrno;
×
811
    }
812
    taosHashSetFreeFp(pFileCtx->pCacheFile, freeSGroupCacheFileInfo);
×
813
  }
814
  
815
  SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
×
816
  if (NULL == pTmp) {
×
817
    (void)snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%u", fileId);
×
818

819
    SGroupCacheFileInfo newFile = {0};
×
820
    newFile.groupNum = 1;
×
821
    QRY_ERR_RET(taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)));
×
822
    pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
×
823
    if (NULL == pTmp) {
×
824
      qError("fail to get file %d in pCacheFile", fileId);
×
825
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
826
    }
827
  } else {
828
    pTmp->groupNum++;
×
829
  }
830

831
  qTrace("FileId:%d-%d-%d add groupNum to %u", downstreamId, vgId, fileId, pTmp->groupNum);
×
832

833
  return TSDB_CODE_SUCCESS;
×
834
}
835

836
static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) {
48✔
837
  if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastBlkUid == uid) {
48!
838
    return TSDB_CODE_SUCCESS;
×
839
  }
840
  
841
  pCtx->lastBlkUid = uid;
48✔
842
  pGroup->pVgCtx->lastBlkUid = uid;
48✔
843
  
844
  int32_t i = 0;
48✔
845
  while (true) {
×
846
    SGcNewGroupInfo* pNew = taosArrayGet(pGroup->pVgCtx->pTbList, i++);
48✔
847
    if (NULL == pNew || pNew->uid == uid) {
48!
848
      break;
849
    }
850
    QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup));
×
851
  }
852

853
  groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx, pGroup->downstreamIdx, pGroup->vgId, true);
48!
854

855
  pGroup->fileId = pGroup->pVgCtx->fileCtx.fileId;
48✔
856
  pGroup->startOffset = pGroup->pVgCtx->fileCtx.fileSize;
48✔
857

858
  qTrace("FileId:%d-%d-%d add groupNum for group %" PRIu64, pGroup->downstreamIdx, pGroup->vgId, pGroup->pVgCtx->fileCtx.fileId, uid);
48!
859

860
  if (pGroup->needCache) {
48!
861
    return addFileRefTableNum(&pGroup->pVgCtx->fileCtx, pGroup->pVgCtx->fileCtx.fileId, pGroup->downstreamIdx, pGroup->vgId);
×
862
  }
863

864
  return TSDB_CODE_SUCCESS;
48✔
865
}
866

867

868
static FORCE_INLINE int32_t initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) {
869
  QRY_ERR_RET(taosThreadMutexInit(&pGroup->mutex, NULL));
2,677!
870
  
871
  pGroup->downstreamIdx = downstreamIdx;
2,677✔
872
  pGroup->vgId = vgId;
2,677✔
873
  pGroup->fileId = -1;
2,677✔
874
  pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic));
2,677✔
875
  if (NULL == pGroup->blkList.pList) {
2,677!
876
    QRY_ERR_RET(terrno);
×
877
  }
878
  pGroup->startOffset = -1;
2,677✔
879
  pGroup->needCache = needCache;
2,677✔
880
  pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
2,677✔
881

882
  return TSDB_CODE_SUCCESS;
2,677✔
883
}
884

885
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) {
2,677✔
886
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
2,677✔
887
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
2,677✔
888
  SGcOperatorParam* pGcParam = pParam->value;  
2,677✔
889
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
2,677!
890
  SGroupCacheData grpData = {0};
2,677✔
891
  
892
  while (true) {
893
    if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
2,677!
894
      if (terrno == TSDB_CODE_DUP_KEY) {
×
895
        *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
×
896
        if (*ppGrp) {
×
897
          break;
×
898
        }
899
      } else {
900
        return terrno;
×
901
      }
902
    }
903

904
    break;
2,677✔
905
  }
906

907
  *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
2,677✔
908
  if (NULL == *ppGrp) {
2,677!
909
    return terrno;
×
910
  }
911
  QRY_ERR_RET(initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache));
5,354!
912

913
  qDebug("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache);
2,677✔
914

915
  if (pParam->pChildren) {
2,677✔
916
    SGcNewGroupInfo newGroup;
917
    newGroup.pGroup = *ppGrp;
654✔
918
    newGroup.vgId = vgId;
654✔
919
    newGroup.uid = uid;
654✔
920
    newGroup.pParam = taosArrayGetP(pParam->pChildren, 0);
654✔
921
    
922
    taosWLockLatch(&pCtx->grpLock);
654✔
923
    if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) {
1,308!
924
      taosWUnLockLatch(&pCtx->grpLock);
×
925
      return terrno;
×
926
    }
927
    taosWUnLockLatch(&pCtx->grpLock);
654✔
928
    
929
    taosArrayDestroy(pParam->pChildren);
654✔
930
    pParam->pChildren = NULL;
654✔
931
    pCtx->fetchDone = false;
654✔
932
  }
933

934
  return TSDB_CODE_SUCCESS;
2,677✔
935
}
936

937
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int64_t* pIdx) {
2,569✔
938
  taosWLockLatch(&pGroup->blkList.lock);
2,569✔
939
  if (NULL == taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic)) {
5,138!
940
    QRY_ERR_RET(terrno);
×
941
  }
942
  *pIdx = taosArrayGetSize(pGroup->blkList.pList) - 1;
2,569✔
943
  taosWUnLockLatch(&pGroup->blkList.lock);
2,569✔
944

945
  qDebug("block added to group cache, total block num:%" PRId64, *pIdx + 1);
2,569✔
946
  
947
  return TSDB_CODE_SUCCESS;
2,569✔
948
}
949

950
static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
2,617✔
951
  int32_t code = TSDB_CODE_SUCCESS;
2,617✔
952
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
2,617✔
953
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
2,617✔
954
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
2,617!
955
  int64_t newBlkIdx = 0;
2,617✔
956

957
  SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId));
2,617✔
958
  if (NULL == pGroup) {
2,617✔
959
    if (pGCache->batchFetch) {
1,970!
960
      SGcOperatorParam fakeGcParam = {0};
1,970✔
961
      SOperatorParam fakeParam = {0};
1,970✔
962
      fakeGcParam.needCache = true;
1,970✔
963
      fakeParam.downstreamIdx = pSession->downstreamIdx;
1,970✔
964
      fakeParam.value = &fakeGcParam;
1,970✔
965
      fakeParam.reUse = false;
1,970✔
966
      code = addNewGroupData(pOperator, &fakeParam, &pGroup, GROUP_CACHE_DEFAULT_VGID, pBlock->info.id.groupId);
1,970✔
967
      if (TSDB_CODE_SUCCESS != code) {
1,970!
UNCOV
968
        return code;
×
969
      }
970
    } else {
971
      qError("group %" PRIu64 " not found in group hash", pBlock->info.id.groupId);
×
UNCOV
972
      return TSDB_CODE_INVALID_PARA;
×
973
    }
974
  }
975

976
  if (!pGCache->batchFetch) {
2,617✔
977
    code = handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId);
48✔
978
    if (TSDB_CODE_SUCCESS != code) {
48!
UNCOV
979
      return code;
×
980
    }
981
  }
982

983
  if (pGroup->needCache) {
2,617✔
984
    qDebug("add block to group cache");
2,569✔
985
    
986
    SGcBlkBufInfo newBlkBuf;    
987
    code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf);
2,569✔
988
    if (code) {
2,569!
UNCOV
989
      return code;
×
990
    }
991

992
    code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &newBlkIdx);
2,569✔
993
    if (code) {
2,569!
UNCOV
994
      return code;
×
995
    }
996
  } else {
997
    qDebug("no need to add block to group cache");
48✔
998
    
999
    pGroup->pBlock = pBlock;
48✔
1000
  }
1001

1002
  QRY_ERR_RET(notifyWaitingSessions(pGroup->waitQueue));
2,617!
1003
  if (pGroup == pSession->pGroupData) {
2,617✔
1004
    if (pGroup->needCache) {
647✔
1005
      pSession->lastBlkId = newBlkIdx;
599✔
1006
    }
1007
    
1008
    *continueFetch = false;
647✔
1009
  }
1010

1011
  return TSDB_CODE_SUCCESS;
2,617✔
1012
}
1013

1014
static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) {
566✔
1015
  int32_t code = TSDB_CODE_SUCCESS;
566✔
1016
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
566✔
1017
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];  
566✔
1018
  if (pGCache->batchFetch) {
566✔
1019
    SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
518!
1020
    SGroupCacheData* pGroup = NULL;
518✔
1021
    while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) {
2,902✔
1022
      QRY_ERR_RET(handleGroupFetchDone(pGroup));
2,384!
1023
    }
1024
    pCtx->fetchDone = true;
518✔
1025
  } else {
1026
    int32_t uidNum = 0;
48✔
1027
    SGcVgroupCtx* pVgCtx = NULL;
48✔
1028
    int32_t iter = 0;
48✔
1029
    while (NULL != (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter))) {
96✔
1030
      uidNum = taosArrayGetSize(pVgCtx->pTbList);
48✔
1031
      for (int32_t i = 0; i < uidNum; ++i) {
96✔
1032
        SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i);
48✔
1033
        QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup));
96!
1034
      }
1035
      taosArrayClear(pVgCtx->pTbList);
48✔
1036
    }    
1037
  }
1038

1039
  taosHashClear(pCtx->pWaitSessions);
566✔
1040

1041
  return TSDB_CODE_SUCCESS;
566✔
1042
}
1043

1044
static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
1,213✔
1045
  bool continueFetch = true;
1,213✔
1046
  int32_t code = TSDB_CODE_SUCCESS;
1,213✔
1047
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
1,213✔
1048

1049
  while (continueFetch && TSDB_CODE_SUCCESS == code) {
3,830!
1050
    QRY_ERR_RET(getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes));
6,366!
1051
    
1052
    if (NULL == *ppRes) {
3,183✔
1053
      QRY_ERR_RET(handleDownstreamFetchDone(pOperator, pSession));
566!
1054
      break;
566✔
1055
    } else {
1056
      QRY_ERR_RET(handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch));
2,617!
1057
    }
1058
  }
1059

1060
  if (!continueFetch) {
1,213✔
1061
    SGcSessionCtx** ppWaitCtx = taosHashIterate(pCtx->pWaitSessions, NULL);
647✔
1062
    if (ppWaitCtx) {
647!
1063
      taosHashCancelIterate(pCtx->pWaitSessions, ppWaitCtx);
×
1064
      int64_t* pSessionId = taosHashGetKey(ppWaitCtx, NULL);
×
1065
      if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, *pSessionId)) {
×
1066
        qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
×
UNCOV
1067
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1068
      }
1069
      SGcSessionCtx* pWaitCtx = *ppWaitCtx;
×
1070
      pWaitCtx->newFetch = true;
×
1071
      code = taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
×
1072
      if (code) {
×
1073
        qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", *pSessionId, tstrerror(code));
×
UNCOV
1074
        return code;
×
1075
      }
UNCOV
1076
      QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem));
×
1077

UNCOV
1078
      return code;
×
1079
    }
1080
  }
1081

1082
  if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, -1)) {
1,213!
1083
    qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
×
UNCOV
1084
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1085
  }
1086
  
1087
  return code;
1,213✔
1088
}
1089

1090
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes, bool* got) {
63,403✔
1091
  int32_t code = TSDB_CODE_SUCCESS;
63,403✔
1092
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
63,403✔
1093
  *got = true;
63,403✔
1094

1095
  if (NULL != pSession->pGroupData) {
63,403!
1096
    if (pSession->pGroupData->needCache) {
63,403✔
1097
      SGcBlkList* pBlkList = &pSession->pGroupData->blkList;
63,259✔
1098
      taosRLockLatch(&pBlkList->lock);
63,259✔
1099
      int64_t blkNum = taosArrayGetSize(pBlkList->pList);
63,259✔
1100
      if (pSession->lastBlkId < 0) {
63,259✔
1101
        if (blkNum > 0) {
41,838✔
1102
          SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0);
41,179✔
1103
          taosRUnLockLatch(&pBlkList->lock);
41,179✔
1104
          code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
41,179✔
1105
          pSession->lastBlkId = 0;
41,179✔
1106
          return code;
41,179✔
1107
        }
1108
      } else if ((pSession->lastBlkId + 1) < blkNum) {
21,421!
1109
        SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1);
×
1110
        taosRUnLockLatch(&pBlkList->lock);
×
1111
        code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
×
1112
        pSession->lastBlkId++;
×
UNCOV
1113
        return code;
×
1114
      }
1115
      taosRUnLockLatch(&pBlkList->lock);
22,080✔
1116
    } else if (pSession->pGroupData->pBlock) {
144✔
1117
      *ppRes = pSession->pGroupData->pBlock;
48✔
1118
      pSession->pGroupData->pBlock = NULL;
48✔
1119
      return TSDB_CODE_SUCCESS;
48✔
1120
    }
1121

1122
    if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
22,176✔
1123
      *ppRes = NULL;
20,963✔
1124
      qDebug("sessionId: %" PRIu64 " fetch done", sessionId);
20,963✔
1125
      return code;
20,963✔
1126
    }
1127
  } else {
1128
    *ppRes = NULL;
×
1129
    qDebug("sessionId: %" PRIu64 " fetch done since downstream fetch done", sessionId);
×
UNCOV
1130
    return code;
×
1131
  }
1132

1133
  *got = false;
1,213✔
1134
  return code;
1,213✔
1135
}
1136

1137

UNCOV
1138
static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
×
1139
  // FOR NOW, IT'S ERROR TO REACH HERE
1140
#if 1
1141
  qError("should not enter session wait");
×
UNCOV
1142
  return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1143
#else
1144
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
1145
  SGroupCacheData* pGroup = pSession->pGroupData;
1146
  int32_t code = TSDB_CODE_SUCCESS;
1147
  bool inLock = true;
1148
  if (NULL == pGroup->waitQueue) {
1149
    pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES);
1150
    if (NULL == pGroup->waitQueue) {
1151
      QRY_ERR_JRET(terrno);
1152
    }
1153
  }
1154
  
1155
  if (NULL == taosArrayPush(pGroup->waitQueue, &pSession)) {
1156
    QRY_ERR_JRET(terrno);
1157
  }
1158

1159
  if (!pSession->semInit) {
1160
    QRY_ERR_JRET(tsem_init(&pSession->waitSem, 0, 0));
1161
    pSession->semInit = true;
1162
  }
1163

1164
  (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
1165
  inLock = false;
1166

1167
  QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES));
1168

1169
  code = tsem_wait(&pSession->waitSem);
1170
  if (code) {
1171
    qError("tsem_wait failed, error:%s", tstrerror(code));
1172
    QRY_ERR_JRET(code);
1173
  }
1174

1175
  if (pSession->newFetch) {
1176
    pSession->newFetch = false;
1177
    return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
1178
  }
1179

1180
  code = taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
1181
  if (code) {
1182
    qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", sessionId, tstrerror(code));
1183
    QRY_ERR_JRET(code);
1184
  }
1185

1186
  bool got = false;
1187
  return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
1188

1189
_return:
1190

1191
  if (inLock) {
1192
    (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
1193
  }
1194

1195
  return code;
1196
#endif
1197
}
1198

1199

1200
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
63,403✔
1201
  int32_t code = TSDB_CODE_SUCCESS;
63,403✔
1202
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
63,403✔
1203
  bool locked = false;
63,403✔
1204
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
63,403✔
1205
  
1206
  while (true) {
1207
    bool got = false;
63,403✔
1208
    code = getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
63,403✔
1209
    if (TSDB_CODE_SUCCESS != code || got) {
63,403!
1210
      goto _return;
63,403✔
1211
    }
1212
    
1213
    if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId)
1,213!
1214
      || (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) {
1,213!
1215
      if (locked) {
1,213!
1216
        (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
×
UNCOV
1217
        locked = false;
×
1218
      }
1219
      
1220
      code = getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
1,213✔
1221
      goto _return;
1,213✔
1222
    } else {
1223
      // FOR NOW, SHOULD NOT REACH HERE
1224
      qError("Invalid fetchSessionId:%" PRId64 ", currentSessionId:%" PRId64, pCtx->fetchSessionId, sessionId);
×
UNCOV
1225
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1226
    }
1227

1228
    if (locked) {
1229
      code = groupCacheSessionWait(pOperator, pCtx, sessionId, pSession, ppRes);
1230
      locked = false;
1231
      if (TSDB_CODE_SUCCESS != code) {
1232
        goto _return;
1233
      }
1234
      
1235
      break;
1236
    }
1237
    
1238
    (void)taosThreadMutexLock(&pSession->pGroupData->mutex);
1239
    locked = true;
1240
  };
1241

1242

1243
_return:
63,403✔
1244

1245
  if (locked) {
63,403!
UNCOV
1246
    (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
×
1247
  }
1248

1249
  return code;
63,403✔
1250
}
1251

1252

1253
static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
11,035✔
1254
  SGcBlkCacheInfo* pCache = &pInfo->blkCache;
11,035✔
1255
  pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
11,035✔
1256
  if (NULL == pCache->pDirtyBlk) {
11,035!
UNCOV
1257
    return terrno;
×
1258
  }
1259
  taosHashSetFreeFp(pCache->pDirtyBlk, freeGcBlkBufInfo);
11,035✔
1260
  pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
11,035✔
1261
  if (NULL == pCache->pReadBlk) {
11,035!
UNCOV
1262
    return terrno;
×
1263
  }
1264
  pCache->writeDownstreamId = -1;
11,035✔
1265

1266
  return TSDB_CODE_SUCCESS;
11,035✔
1267
}
1268

1269
static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOperatorParam* pGcParam, SGroupCacheData* pGroup) {
1270
  pSession->pParam = pGcParam;
41,886✔
1271
  pSession->downstreamIdx = pGcParam->downstreamIdx;
41,886✔
1272
  pSession->pGroupData = pGroup;
41,886✔
1273
  pSession->lastBlkId = -1;
41,886✔
1274
}
41,886✔
1275

1276
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {
42,005✔
1277
  int32_t code = TSDB_CODE_SUCCESS;
42,005✔
1278
  SGcSessionCtx ctx = {0};
42,005✔
1279
  SGcOperatorParam* pGcParam = pParam->value;  
42,005✔
1280
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
42,005✔
1281
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
42,005✔
1282
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
42,005!
1283

1284
  SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
42,005✔
1285
  if (NULL == pGroup && (NULL != pParam->pChildren || !pCtx->fetchDone)) {
42,005✔
1286
    code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? GROUP_CACHE_DEFAULT_VGID : pGcParam->vgId, pGcParam->tbUid);
707✔
1287
    if (TSDB_CODE_SUCCESS != code) {
707!
UNCOV
1288
      return code;
×
1289
    }
1290
  }
1291

1292
  if (NULL == pGroup) {
42,005✔
1293
    return TSDB_CODE_SUCCESS;
119✔
1294
  }
1295

1296
  initGroupCacheSessionCtx(&ctx, pGcParam, pGroup);
41,886✔
1297

1298
  code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
41,886✔
1299
  if (TSDB_CODE_SUCCESS != code) {
41,886!
UNCOV
1300
    return code;
×
1301
  }
1302

1303
  *ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
41,886✔
1304
  if (NULL == *ppSession) {
41,886!
1305
    qError("fail to get session %" PRId64 " from pSessions", pGcParam->sessionId);
×
UNCOV
1306
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1307
  }
1308

1309
  qDebug("session:%" PRId64 " initialized, downstreamIdx:%d, vgId:%d, tbUid:%" PRId64 ", needCache:%d", 
41,886✔
1310
    pGcParam->sessionId, pGcParam->downstreamIdx, pGcParam->vgId, pGcParam->tbUid, pGcParam->needCache);
1311
  
1312
  return TSDB_CODE_SUCCESS;
41,886✔
1313
}
1314

1315
static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) {
63,522✔
1316
  int32_t code = TSDB_CODE_SUCCESS;
63,522✔
1317
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
63,522✔
1318
  SGcOperatorParam* pGcParam = pParam->value;
63,522✔
1319
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
63,522✔
1320
  SGcSessionCtx* pSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
63,522✔
1321
  if (NULL == pSession) {
63,522✔
1322
    int32_t code = initGroupCacheSession(pOperator, pParam, &pSession);
42,005✔
1323
    if (TSDB_CODE_SUCCESS != code) {
42,005!
UNCOV
1324
      return code;
×
1325
    }
1326
    if (NULL == pSession) {
42,005✔
1327
      qDebug("session %" PRId64 " in downstream %d total got 0 rows since downtream fetch done", pGcParam->sessionId, pCtx->id);
119✔
1328
      return TSDB_CODE_SUCCESS;
119✔
1329
    }
1330
  } else if (pSession->pGroupData->needCache) {
21,517✔
1331
    SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
21,421✔
1332
    if (ppBlock) {
21,421✔
1333
      QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock));
20,970!
1334
      code = taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
20,970✔
1335
      if (code) {
20,970!
1336
        qError("taosHashRemove session %" PRId64 " from pReadBlk failed, error: %s", pGcParam->sessionId, tstrerror(code));
×
UNCOV
1337
        QRY_ERR_RET(code);
×
1338
      }
1339
    }
1340
  }
1341
  
1342
  QRY_ERR_RET(getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes));
63,403!
1343
  if (NULL == *ppRes) {
63,403✔
1344
    qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
21,529✔
1345
    code = taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
21,529✔
1346
    if (code) {
21,529!
1347
      qError("taosHashRemove session %" PRId64 " from pSessions failed, error: %s", pGcParam->sessionId, tstrerror(code));
×
UNCOV
1348
      QRY_ERR_RET(code);
×
1349
    }
1350
  } else {
1351
    pSession->resRows += (*ppRes)->info.rows;
41,874✔
1352
    qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows);
41,874✔
1353
  }
1354

1355
  return code;
63,403✔
1356
}
1357

1358
static int32_t initGroupCacheExecInfo(SOperatorInfo*        pOperator) {
11,035✔
1359
  SGroupCacheOperatorInfo* pInfo = pOperator->info;
11,035✔
1360
  pInfo->execInfo.pDownstreamBlkNum = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(int64_t));
11,035!
1361
  if (NULL == pInfo->execInfo.pDownstreamBlkNum) {
11,035!
UNCOV
1362
    return terrno;
×
1363
  }
1364
  return TSDB_CODE_SUCCESS;
11,035✔
1365
}
1366

1367
static void freeRemoveGroupCacheData(void* p) {
2,677✔
1368
  SGroupCacheData* pGroup = p;
2,677✔
1369
  if (pGroup->vgId > 0 && pGroup->needCache) {
2,677!
1370
    SGcFileCacheCtx* pFileCtx = &pGroup->pVgCtx->fileCtx;
×
1371
    if (pGroup->fileId >= 0) {
×
1372
      SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
×
UNCOV
1373
      uint32_t remainNum = atomic_sub_fetch_32(&pFileInfo->groupNum, 1);
×
1374

UNCOV
1375
      qTrace("FileId:%d-%d-%d sub group num to %u", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId, remainNum);
×
1376

1377
      if (0 == remainNum && pGroup->fileId != pFileCtx->fileId) {
×
UNCOV
1378
        removeGroupCacheFile(pFileInfo);
×
1379

1380
#if 0
1381
        /* debug only */
1382
        snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", pGroup->fileId);
1383
        taosRemoveFile(pFileCtx->baseFilename);
1384
        /* debug only */
1385
#endif
1386

UNCOV
1387
        qTrace("FileId:%d-%d-%d removed", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId);
×
1388
        //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
1389
      }
1390
    }
1391
  }
1392

1393
  taosArrayDestroy(pGroup->waitQueue);
2,677✔
1394
  taosArrayDestroy(pGroup->blkList.pList);
2,677✔
1395
  (void)taosThreadMutexDestroy(&pGroup->mutex);
2,677✔
1396

1397
  qTrace("group removed");
2,677!
1398
}
2,677✔
1399

1400

1401

1402
static int32_t initGroupCacheDownstreamCtx(SOperatorInfo*          pOperator) {
11,035✔
1403
  SGroupCacheOperatorInfo* pInfo = pOperator->info;
11,035✔
1404
  pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams));
11,035!
1405
  if (NULL == pInfo->pDownstreams) {
11,035!
UNCOV
1406
    return terrno;
×
1407
  }
1408
  pInfo->downstreamNum = pOperator->numOfDownstream;
11,035✔
1409

1410
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
33,105✔
1411
    SGcDownstreamCtx* pCtx = &pInfo->pDownstreams[i];
22,070✔
1412
    pCtx->id = i;
22,070✔
1413
    pCtx->fetchSessionId = -1;
22,070✔
1414
    pCtx->lastBlkUid = 0;
22,070✔
1415
    pCtx->pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
22,070✔
1416
    if (NULL == pCtx->pVgTbHash) {
22,070!
UNCOV
1417
      return terrno;
×
1418
    }
1419
    tSimpleHashSetFreeFp(pCtx->pVgTbHash, freeSGcVgroupCtx);      
22,070✔
1420

1421
    if (pInfo->batchFetch) {
22,070✔
1422
      int32_t defaultVg = 0;
22,046✔
1423
      SGcVgroupCtx vgCtx = {0};
22,046✔
1424
      initGcVgroupCtx(pOperator, &vgCtx, pCtx->id, defaultVg, NULL);      
22,046✔
1425
      QRY_ERR_RET(tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)));
22,046!
1426
    }
1427
    
1428
    pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo));
22,070✔
1429
    if (NULL == pCtx->pNewGrpList) {
22,070!
UNCOV
1430
      return terrno;
×
1431
    }
1432
    if (!pInfo->globalGrp) {
22,070!
1433
      pCtx->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
22,070✔
1434
      if (pCtx->pGrpHash == NULL) {
22,070!
UNCOV
1435
        return terrno;
×
1436
      }
1437
      taosHashSetFreeFp(pCtx->pGrpHash, freeRemoveGroupCacheData);      
22,070✔
1438
    }
1439

1440
    pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
22,070✔
1441
    if (pCtx->pSessions == NULL) {
22,070!
UNCOV
1442
      return terrno;
×
1443
    }
1444
    taosHashSetFreeFp(pCtx->pSessions, freeSGcSessionCtx);
22,070✔
1445
  
1446
    pCtx->pFreeBlock = taosArrayInit(10, POINTER_BYTES);
22,070✔
1447
    if (NULL == pCtx->pFreeBlock) {
22,070!
UNCOV
1448
      return terrno;
×
1449
    }
1450
    
1451
    pCtx->pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
22,070✔
1452
    if (pCtx->pWaitSessions == NULL) {
22,070!
UNCOV
1453
      return terrno;
×
1454
    }
1455

1456
    (void)snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d", 
44,140✔
1457
      tsTempDir, taosGetPId(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pCtx->id);
22,070✔
1458
    pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0;
22,070✔
1459
    pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename);
22,070✔
1460
  }
1461

1462
  return TSDB_CODE_SUCCESS;
11,035✔
1463
}
1464

1465
static int32_t groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
63,522✔
1466
  *pRes = NULL;
63,522✔
1467

1468
  SSDataBlock* pBlock = NULL;
63,522✔
1469
  int64_t      st = 0;
63,522✔
1470
  int32_t      code = 0;
63,522✔
1471

1472
  if (pOperator->cost.openCost == 0) {
63,522✔
1473
    st = taosGetTimestampUs();
315✔
1474
  }
1475

1476
  code = getBlkFromGroupCache(pOperator, &pBlock, pParam);
63,522✔
1477
  if (TSDB_CODE_SUCCESS != code) {
63,522!
1478
    pOperator->pTaskInfo->code = code;
×
UNCOV
1479
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1480
  }
1481

1482
  if (pOperator->cost.openCost == 0) {
63,522✔
1483
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
315✔
1484
  }
1485

1486
  *pRes = pBlock;
63,522✔
1487
  return code;
63,522✔
1488
}
1489

1490
static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam* pParam) {
×
1491
  SGcNotifyOperatorParam* pGcParam = pParam->value;
×
1492
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
1493
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGcParam->downstreamIdx];
×
UNCOV
1494
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
×
1495

1496
  qTrace("try to remove group %" PRIu64, pGcParam->tbUid);
×
1497
  if (taosHashRemove(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid))) {
×
1498
    qError("failed to remove group %" PRIu64 " in vgId %d downstreamIdx %d", pGcParam->tbUid, pGcParam->vgId, pGcParam->downstreamIdx);
×
UNCOV
1499
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1500
  }
1501

UNCOV
1502
  return TSDB_CODE_SUCCESS;
×
1503
}
1504

1505
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
11,035✔
1506
                                     SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1507
                                     SOperatorInfo** pOptrInfo) {
1508
  QRY_PARAM_CHECK(pOptrInfo);
11,035!
1509
  int32_t code = TSDB_CODE_SUCCESS;
11,035✔
1510

1511
  SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));
11,035!
1512
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
11,035!
1513
  if (pOperator == NULL || pInfo == NULL) {
11,035!
1514
    code = terrno;
×
UNCOV
1515
    goto _error;
×
1516
  }
1517

1518
  pOperator->transparent = true;
11,035✔
1519
  
1520
  setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
11,035✔
1521

1522
  pInfo->maxCacheSize = 0;
11,035✔
1523
  pInfo->grpByUid = pPhyciNode->grpByUid;
11,035✔
1524
  pInfo->globalGrp = pPhyciNode->globalGrp;
11,035✔
1525
  pInfo->batchFetch = pPhyciNode->batchFetch;
11,035✔
1526
  
1527
  if (!pInfo->grpByUid) {
11,035!
1528
    qError("only group cache by uid is supported now");
×
1529
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1530
    goto _error;
×
1531
  }
1532
  
1533
  if (pPhyciNode->pGroupCols) {
11,035!
1534
    code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols);
×
1535
    if (code) {
×
UNCOV
1536
      goto _error;
×
1537
    }
1538
  }
1539

1540
  code = initGroupCacheBlockCache(pInfo);
11,035✔
1541
  if (code) {
11,035!
UNCOV
1542
    goto _error;
×
1543
  }
1544

1545
  if (pInfo->globalGrp) {
11,035!
1546
    pInfo->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1547
    if (pInfo->pGrpHash == NULL) {
×
1548
      code = terrno;
×
UNCOV
1549
      goto _error;
×
1550
    }
UNCOV
1551
    taosHashSetFreeFp(pInfo->pGrpHash, freeRemoveGroupCacheData);
×
1552
  }
1553

1554
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
11,035✔
1555
  if (TSDB_CODE_SUCCESS != code) {
11,035!
UNCOV
1556
    goto _error;
×
1557
  }
1558

1559
  code = initGroupCacheDownstreamCtx(pOperator);
11,035✔
1560
  if (TSDB_CODE_SUCCESS != code) {
11,035!
UNCOV
1561
    goto _error;
×
1562
  }
1563

1564
  code = initGroupCacheExecInfo(pOperator);
11,035✔
1565
  if (TSDB_CODE_SUCCESS != code) {
11,035!
UNCOV
1566
    goto _error;
×
1567
  }
1568

1569
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, groupCacheTableCacheEnd);
11,035✔
1570

1571
  qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch);
11,035!
1572

1573
  *pOptrInfo = pOperator;
11,035✔
1574
  return TSDB_CODE_SUCCESS;
11,035✔
1575

1576
_error:
×
1577
  if (pInfo != NULL) {
×
UNCOV
1578
    destroyGroupCacheOperator(pInfo);
×
1579
  }
1580

1581
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1582
  pTaskInfo->code = code;
×
UNCOV
1583
  return code;
×
1584
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc