• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/groupcacheoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "groupcache.h"
29

30

UNCOV
31
static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) {
×
UNCOV
32
  if (pFileInfo->fd.fd) {
×
UNCOV
33
    if (taosCloseFile(&pFileInfo->fd.fd) < 0) {
×
34
      qError("close group cache file failed, fd:%p, error:%s", pFileInfo->fd.fd, tstrerror(terrno));
×
35
    }
UNCOV
36
    pFileInfo->fd.fd = NULL;
×
UNCOV
37
    (void)taosThreadMutexDestroy(&pFileInfo->fd.mutex);
×
38
  }
UNCOV
39
  pFileInfo->deleted = true;
×
UNCOV
40
}
×
41

42

43
static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) {
×
44
  pCols->colNum = LIST_LENGTH(pList);
×
45
  pCols->withNull = grpColsMayBeNull;  
×
46
  pCols->pColsInfo = taosMemoryMalloc(pCols->colNum * sizeof(SGroupColInfo));
×
47
  if (NULL == pCols->pColsInfo) {
×
48
    return terrno;
×
49
  }
50

51
  int32_t i = 0;
×
52
  SNode* pNode = NULL;
×
53
  FOREACH(pNode, pList) {
×
54
    SColumnNode* pColNode = (SColumnNode*)pNode;
×
55
    pCols->pColsInfo[i].slot = pColNode->slotId;
×
56
    pCols->pColsInfo[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
×
57
    pCols->pColsInfo[i].bytes = pColNode->node.resType.bytes;
×
58
    pCols->bufSize += pColNode->node.resType.bytes;
×
59
    ++i;
×
60
  }  
61

62
  if (pCols->withNull) {
×
63
    pCols->bitMapSize = pCols->colNum / sizeof(int8_t) + ((pCols->colNum % sizeof(int8_t)) ? 1 : 0);
×
64
    pCols->bufSize += pCols->bitMapSize;
×
65
  }
66

67
  if (pCols->colNum > 1) {
×
68
    pCols->pBuf = taosMemoryMalloc(pCols->bufSize);
×
69
    if (NULL == pCols->pBuf) {
×
70
      return terrno;
×
71
    }
72
  }
73

74
  return TSDB_CODE_SUCCESS;
×
75
}
76

UNCOV
77
static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
×
UNCOV
78
  if (pGrpCacheOperator->downstreamNum <= 0 || NULL == pGrpCacheOperator->execInfo.pDownstreamBlkNum) {
×
79
    return;
×
80
  }
81

UNCOV
82
  int32_t bufSize = pGrpCacheOperator->downstreamNum * 32 + 100;
×
UNCOV
83
  char* buf = taosMemoryMalloc(bufSize);
×
UNCOV
84
  if (NULL == buf) {
×
85
    return;
×
86
  }
UNCOV
87
  int32_t offset = tsnprintf(buf, bufSize, "groupCache exec info, downstreamBlkNum:");
×
UNCOV
88
  for (int32_t i = 0; i < pGrpCacheOperator->downstreamNum; ++i) {
×
UNCOV
89
    offset += tsnprintf(buf + offset, bufSize, " %" PRId64 , pGrpCacheOperator->execInfo.pDownstreamBlkNum[i]);
×
90
  }
UNCOV
91
  qDebug("%s", buf);
×
UNCOV
92
  taosMemoryFree(buf);
×
93
}
94

UNCOV
95
static void freeSGcSessionCtx(void* p) {
×
UNCOV
96
  SGcSessionCtx* pSession = p;
×
UNCOV
97
  if (pSession->semInit) {
×
98
    if (tsem_destroy(&pSession->waitSem) < 0) {
×
99
      qError("tsem_destroy session waitSem failed, error:%s", tstrerror(terrno));
×
100
    }
101
  }
UNCOV
102
}
×
103

UNCOV
104
static void freeSGroupCacheFileInfo(void* p) {
×
UNCOV
105
  SGroupCacheFileInfo* pFileInfo = p;
×
UNCOV
106
  if (pFileInfo->deleted) {
×
107
    return;
×
108
  }
109

UNCOV
110
  removeGroupCacheFile(pFileInfo);
×
111
}
112

UNCOV
113
static void freeSGcFileCacheCtx(SGcFileCacheCtx* pFileCtx) {
×
UNCOV
114
  taosHashCleanup(pFileCtx->pCacheFile);
×
UNCOV
115
}
×
116

UNCOV
117
static void freeSGcVgroupCtx(void* p) {
×
UNCOV
118
  SGcVgroupCtx* pVgCtx = p;
×
UNCOV
119
  taosArrayDestroy(pVgCtx->pTbList);
×
UNCOV
120
  freeSGcFileCacheCtx(&pVgCtx->fileCtx);
×
UNCOV
121
}
×
122

UNCOV
123
static void freeGcBlockInList(void* p) {
×
UNCOV
124
  SSDataBlock** ppBlock = p;
×
UNCOV
125
  if (*ppBlock) {
×
UNCOV
126
    taosArrayDestroy((*ppBlock)->pDataBlock);
×
UNCOV
127
    taosMemoryFree(*ppBlock);
×
128
  }
UNCOV
129
}
×
130

UNCOV
131
static void freeSGcDownstreamCtx(SGcDownstreamCtx* pCtx) {
×
UNCOV
132
  taosArrayDestroy(pCtx->pNewGrpList);
×
UNCOV
133
  taosHashCleanup(pCtx->pGrpHash);
×
UNCOV
134
  tSimpleHashCleanup(pCtx->pVgTbHash);
×
135

UNCOV
136
  taosArrayDestroyEx(pCtx->pFreeBlock, freeGcBlockInList);
×
UNCOV
137
  taosHashCleanup(pCtx->pSessions);
×
UNCOV
138
  taosHashCleanup(pCtx->pWaitSessions);
×
UNCOV
139
  freeSGcFileCacheCtx(&pCtx->fileCtx);
×
UNCOV
140
}
×
141

UNCOV
142
static void destroyGroupCacheDownstreamCtx(SGroupCacheOperatorInfo* pGrpCacheOperator) {
×
UNCOV
143
  if (NULL == pGrpCacheOperator->pDownstreams) {
×
144
    return;
×
145
  }
146
  
UNCOV
147
  for (int32_t i = 0; i < pGrpCacheOperator->downstreamNum; ++i) {
×
UNCOV
148
    SGcDownstreamCtx* pCtx = &pGrpCacheOperator->pDownstreams[i];
×
UNCOV
149
    freeSGcDownstreamCtx(pCtx);
×
150
  }
151

UNCOV
152
  taosMemoryFree(pGrpCacheOperator->pDownstreams);
×
153
}
154

155

UNCOV
156
void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
×
UNCOV
157
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
UNCOV
158
  for (int32_t i = 0; i < numOfCols; ++i) {
×
UNCOV
159
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
×
UNCOV
160
    if (NULL == p) {
×
161
      qError("fail to get %dth col in dataBlock, numOfCols:%d", i, (int32_t)numOfCols);
×
162
      continue;
×
163
    }
UNCOV
164
    taosMemoryFreeClear(p->pData);
×
UNCOV
165
    if (IS_VAR_DATA_TYPE(p->info.type)) {
×
UNCOV
166
      taosMemoryFreeClear(p->varmeta.offset);
×
UNCOV
167
      p->varmeta.length = 0;
×
UNCOV
168
      p->varmeta.allocLen = 0;
×
169
    } else {
UNCOV
170
      taosMemoryFreeClear(p->nullbitmap);
×
171
    }
172
  }
UNCOV
173
  pDataBlock->info.capacity = 0;
×
UNCOV
174
  pDataBlock->info.rows = 0;
×
UNCOV
175
}
×
176

177

178

UNCOV
179
static void destroySGcBlkCacheInfo(SGcBlkCacheInfo* pBlkCache) {
×
UNCOV
180
  taosHashCleanup(pBlkCache->pDirtyBlk);
×
181

UNCOV
182
  void* p = NULL;
×
UNCOV
183
  while (NULL != (p = taosHashIterate(pBlkCache->pReadBlk, p))) {
×
UNCOV
184
    blockDataDeepCleanup(*(SSDataBlock**)p);
×
UNCOV
185
    freeGcBlockInList(p);
×
186
  }
187

UNCOV
188
  taosHashCleanup(pBlkCache->pReadBlk);
×
UNCOV
189
}
×
190

UNCOV
191
static void destroyGroupCacheOperator(void* param) {
×
UNCOV
192
  SGroupCacheOperatorInfo* pGrpCacheOperator = (SGroupCacheOperatorInfo*)param;
×
193

UNCOV
194
  logGroupCacheExecInfo(pGrpCacheOperator);
×
195
  
UNCOV
196
  taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
×
UNCOV
197
  taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
×
198

UNCOV
199
  destroyGroupCacheDownstreamCtx(pGrpCacheOperator);
×
UNCOV
200
  destroySGcBlkCacheInfo(&pGrpCacheOperator->blkCache);
×
UNCOV
201
  taosHashCleanup(pGrpCacheOperator->pGrpHash);
×
202

UNCOV
203
  taosMemoryFree(pGrpCacheOperator->execInfo.pDownstreamBlkNum);
×
204
  
UNCOV
205
  taosMemoryFreeClear(param);
×
UNCOV
206
}
×
207

208
static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) {
UNCOV
209
  TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
×
210
  //TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE);
UNCOV
211
  if (NULL == newFd) {
×
212
    QRY_ERR_RET(terrno);
×
213
  }
UNCOV
214
  pFileFd->fd = newFd;
×
UNCOV
215
  int32_t code = taosThreadMutexInit(&pFileFd->mutex, NULL);
×
UNCOV
216
  if (code) {
×
217
    qError("taosThreadMutexInit failed, code:%x", code);
×
218
    QRY_ERR_RET(code);
×
219
  }
220

UNCOV
221
  qTrace("file path %s created", filename);
×
222
  
UNCOV
223
  return TSDB_CODE_SUCCESS;
×
224
}
225

UNCOV
226
static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, SGroupCacheFileFd** ppFd, bool* pDeleted) {
×
UNCOV
227
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
228
  if (NULL == pFileCtx->pCacheFile) {
×
UNCOV
229
    pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
UNCOV
230
    if (NULL == pFileCtx->pCacheFile) {
×
231
      return terrno;
×
232
    }
UNCOV
233
    taosHashSetFreeFp(pFileCtx->pCacheFile, freeSGroupCacheFileInfo);
×
234
  }
235
  
UNCOV
236
  SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
×
UNCOV
237
  if (NULL == pTmp) {
×
UNCOV
238
    (void)snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", fileId);
×
239

UNCOV
240
    SGroupCacheFileInfo newFile = {0};
×
UNCOV
241
    if (taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile))) {
×
242
      QRY_ERR_RET(terrno);
×
243
    }
UNCOV
244
    pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
×
UNCOV
245
    if (NULL == pTmp) {
×
246
      qError("fail to get file %d from pCacheFile", fileId);
×
247
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
248
    }
249
  }
250

UNCOV
251
  if (pTmp->deleted) {
×
252
    *pDeleted = true;
×
253
    return TSDB_CODE_SUCCESS;
×
254
  }
255

UNCOV
256
  if (NULL == pTmp->fd.fd) {
×
UNCOV
257
    code = initOpenCacheFile(&pTmp->fd, pFileCtx->baseFilename);
×
UNCOV
258
    if (code) {
×
259
      return code;
×
260
    }
261
  }
262

UNCOV
263
  (void)taosThreadMutexLock(&pTmp->fd.mutex);
×
UNCOV
264
  *ppFd = &pTmp->fd;
×
265
  
UNCOV
266
  return TSDB_CODE_SUCCESS;
×
267
}
268

269
static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
UNCOV
270
  if (NULL == pFd) {
×
271
    return;
×
272
  }
UNCOV
273
  (void)taosThreadMutexUnlock(&pFd->mutex);
×
274
}
275

UNCOV
276
static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
×
UNCOV
277
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
278
  SGroupCacheFileFd *pFd = NULL;
×
UNCOV
279
  SGcFileCacheCtx* pFileCtx = NULL;
×
UNCOV
280
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
×
UNCOV
281
  int64_t lastGroupId = 0;
×
UNCOV
282
  SGroupCacheData* pGroup = NULL;
×
283
  
UNCOV
284
  while (NULL != pHead) {
×
UNCOV
285
    pFd = NULL;
×
286
    
UNCOV
287
    if (pGCache->batchFetch) {
×
UNCOV
288
      pFileCtx = &pHead->pCtx->fileCtx;
×
289
    } else {
290
      if (pHead->groupId != lastGroupId) {
×
291
        if (NULL != pGroup) {
×
292
          taosHashRelease(pGrpHash, pGroup);
×
293
        }
294
        pGroup = taosHashAcquire(pGrpHash, &pHead->groupId, sizeof(pHead->groupId));      
×
295
        lastGroupId = pHead->groupId;
×
296
      }
297
    
298
      if (NULL == pGroup) {
×
299
        qTrace("group %" PRIu64 " in downstream %d may already be deleted, skip write", pHead->groupId, pHead->pCtx->id);
×
300

301
        int64_t blkId = pHead->basic.blkId;
×
302
        pHead = pHead->next;
×
303
        code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
×
304
        if (code) {
×
305
          qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
306
          goto _return;
×
307
        }
308
        continue;
×
309
      }
310
      
311
      pFileCtx = &pGroup->pVgCtx->fileCtx;
×
312
    }
313

UNCOV
314
    bool deleted = false;
×
UNCOV
315
    code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd, &deleted);
×
UNCOV
316
    if (code) {
×
317
      goto _return;
×
318
    }
319

UNCOV
320
    if (deleted) {
×
321
      releaseFdToFileCtx(pFd);
×
322

323
      qTrace("FileId:%d-%d-%d already be deleted, skip write", 
×
324
          pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);
325
      
326
      int64_t blkId = pHead->basic.blkId;
×
327
      pHead = pHead->next;
×
328
      
329
      code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
×
330
      if (code) {
×
331
        qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
332
        goto _return;
×
333
      }
334
      continue;
×
335
    }
336
    
UNCOV
337
    int64_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
×
UNCOV
338
    if (ret < 0) {
×
339
      releaseFdToFileCtx(pFd);
×
340
      code = terrno;
×
341
      goto _return;
×
342
    }
343
    
UNCOV
344
    ret = taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
×
UNCOV
345
    if (ret != pHead->basic.bufSize) {
×
346
      releaseFdToFileCtx(pFd);
×
347
      code = terrno;
×
348
      goto _return;
×
349
    }
350
    
UNCOV
351
    releaseFdToFileCtx(pFd);
×
352

UNCOV
353
    qTrace("FileId:%d-%d-%d blk %" PRIu64 " in group %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, 
×
354
        pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset);
355
    
UNCOV
356
    int64_t blkId = pHead->basic.blkId;
×
UNCOV
357
    pHead = pHead->next;
×
358

UNCOV
359
    code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
×
UNCOV
360
    if (code) {
×
361
      qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
362
      goto _return;
×
363
    }
364
  }
365

UNCOV
366
_return:
×
367

UNCOV
368
  if (NULL != pGroup) {
×
369
    taosHashRelease(pGrpHash, pGroup);
×
370
  }
371

UNCOV
372
  (void)atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1);
×
373

UNCOV
374
  return code;
×
375
}
376

377

UNCOV
378
void freeGcBlkBufInfo(void* ptr) {
×
UNCOV
379
  SGcBlkBufInfo* pBlk = (SGcBlkBufInfo*)ptr;
×
UNCOV
380
  taosMemoryFreeClear(pBlk->pBuf);
×
UNCOV
381
}
×
382

383

UNCOV
384
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) {
×
UNCOV
385
  if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId), pBufInfo, sizeof(*pBufInfo))) {
×
386
    freeGcBlkBufInfo(pBufInfo);
×
387
    return terrno;
×
388
  }
UNCOV
389
  pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));
×
UNCOV
390
  if (NULL == pBufInfo) {
×
391
    qError("fail to get blk %" PRId64 " from pCache->pDirtyBlk", pBufInfo->basic.blkId);
×
392
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
393
  }
UNCOV
394
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
395
  SGcBlkBufInfo* pWriteHead = NULL;
×
396
  
UNCOV
397
  taosWLockLatch(&pCache->dirtyLock);
×
UNCOV
398
  pCache->blkCacheSize += pBufInfo->basic.bufSize;
×
UNCOV
399
  qDebug("group cache total dirty block num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize);
×
400

UNCOV
401
  if (NULL == pCache->pDirtyHead) {
×
UNCOV
402
    pCache->pDirtyHead = pBufInfo;
×
403
  } else {
404
    pCache->pDirtyTail->next = pBufInfo;
×
405
  }
UNCOV
406
  pCache->pDirtyTail = pBufInfo;
×
407
    
UNCOV
408
  if (pGCache->maxCacheSize >= 0 && pCache->blkCacheSize > pGCache->maxCacheSize) {
×
UNCOV
409
    if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) {
×
UNCOV
410
      pWriteHead = pCache->pDirtyHead;
×
UNCOV
411
      SGcBlkBufInfo* pTmp = pCache->pDirtyHead;
×
UNCOV
412
      while (NULL != pTmp) {
×
UNCOV
413
        pCache->blkCacheSize -= pTmp->basic.bufSize;
×
UNCOV
414
        if (pCache->blkCacheSize <= pGCache->maxCacheSize) {
×
UNCOV
415
          pCache->pDirtyHead = pTmp->next;
×
UNCOV
416
          pTmp->next = NULL;
×
UNCOV
417
          break;
×
418
        }
419
        pTmp = pTmp->next;
×
420
      }
421
    }
422
  }
UNCOV
423
  taosWUnLockLatch(&pCache->dirtyLock);
×
424

UNCOV
425
  if (NULL != pWriteHead) {
×
UNCOV
426
    code = saveBlocksToDisk(pGCache, pCtx, pWriteHead);
×
427
  }
428

UNCOV
429
  return code;
×
430
}
431

432
static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) {
433
  SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
×
434
  if (NULL == pFileInfo) {
×
435
    return;
×
436
  }
437
  
438
  if (0 == pFileInfo->groupNum) {
×
439
    removeGroupCacheFile(pFileInfo);
×
440

441
#if 0  
442
    /* debug only */
443
    snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", pFileCtx->fileId);
444
    taosRemoveFile(pFileCtx->baseFilename);
445
    /* debug only */
446
#endif
447

448
    qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId);
×
449
    //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
450
  }
451
}
452

453
static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId, bool removeCheck) {
UNCOV
454
  if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
×
UNCOV
455
    return;
×
456
  }
457

458
  if (removeCheck) {
×
459
    chkRemoveVgroupCurrFile(pFileCtx, downstreamIdx, vgId);
460
  }
461
      
462
  pFileCtx->fileId++;
×
463
  pFileCtx->fileSize = 0;
×
464
}
465

466

UNCOV
467
static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) {
×
UNCOV
468
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
469
  int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t);
×
UNCOV
470
  pBufInfo->pBuf = taosMemoryMalloc(bufSize);
×
UNCOV
471
  if (NULL == pBufInfo->pBuf) {
×
472
    qError("group cache add block to cache failed, size:%" PRId64, bufSize);
×
473
    return terrno;
×
474
  }
UNCOV
475
  QRY_ERR_RET(blockDataToBuf(pBufInfo->pBuf, pBlock));
×
476

UNCOV
477
  SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pCtx->fileCtx : &pGroup->pVgCtx->fileCtx;
×
478

UNCOV
479
  pBufInfo->next = NULL;
×
UNCOV
480
  pBufInfo->basic.blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1);
×
UNCOV
481
  pBufInfo->basic.fileId = pGCache->batchFetch ? pFileCtx->fileId : pGroup->fileId;
×
UNCOV
482
  pBufInfo->basic.bufSize = bufSize;
×
UNCOV
483
  pBufInfo->basic.offset = atomic_fetch_add_64(&pFileCtx->fileSize, bufSize);
×
UNCOV
484
  pBufInfo->pCtx = pCtx;
×
UNCOV
485
  pBufInfo->groupId = pBlock->info.id.groupId;
×
486

UNCOV
487
  if (pGCache->batchFetch) {    
×
UNCOV
488
    groupCacheSwitchNewFile(pFileCtx, pCtx->id, pGroup->vgId, false);
×
489
  }
490

UNCOV
491
  int32_t code = addBlkToDirtyBufList(pGCache, pCtx, &pGCache->blkCache, pBufInfo);
×
492

UNCOV
493
  return code;
×
494
}
495

UNCOV
496
void blockDataDeepClear(SSDataBlock* pDataBlock) {
×
UNCOV
497
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
UNCOV
498
  for (int32_t i = 0; i < numOfCols; ++i) {
×
UNCOV
499
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
×
UNCOV
500
    if (NULL == p) {
×
501
      qError("fail to get %d col from pDataBlock, numOfCols:%d", i, (int32_t)numOfCols);
×
502
      continue;
×
503
    }
UNCOV
504
    p->pData = NULL;
×
UNCOV
505
    if (IS_VAR_DATA_TYPE(p->info.type)) {
×
UNCOV
506
      p->varmeta.offset = NULL;
×
UNCOV
507
      p->varmeta.length = 0;
×
UNCOV
508
      p->varmeta.allocLen = 0;
×
509
    } else {
UNCOV
510
      p->nullbitmap = NULL;
×
511
    }
512
  }
UNCOV
513
  pDataBlock->info.capacity = 0;
×
UNCOV
514
  pDataBlock->info.rows = 0;
×
UNCOV
515
}
×
516

UNCOV
517
static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) {
×
UNCOV
518
  *ppDst = taosMemoryMalloc(sizeof(*pSrc));
×
UNCOV
519
  if (NULL == *ppDst) {
×
520
    return terrno;
×
521
  }
UNCOV
522
  (*ppDst)->pBlockAgg = NULL;
×
UNCOV
523
  (*ppDst)->pDataBlock = taosArrayDup(pSrc->pDataBlock, NULL);
×
UNCOV
524
  if (NULL == (*ppDst)->pDataBlock) {
×
525
    taosMemoryFree(*ppDst);
×
526
    return terrno;
×
527
  }
UNCOV
528
  TAOS_MEMCPY(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info));
×
UNCOV
529
  blockDataDeepClear(*ppDst);
×
530
  
UNCOV
531
  return TSDB_CODE_SUCCESS;
×
532
}
533

UNCOV
534
static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** ppRes) {
×
UNCOV
535
  taosWLockLatch(&pCtx->blkLock);
×
UNCOV
536
  if (taosArrayGetSize(pCtx->pFreeBlock) <= 0) {
×
UNCOV
537
    taosWUnLockLatch(&pCtx->blkLock);
×
UNCOV
538
    return buildGroupCacheBaseBlock(ppRes, pCtx->pBaseBlock);
×
539
  }
UNCOV
540
  *ppRes = *(SSDataBlock**)taosArrayPop(pCtx->pFreeBlock);
×
UNCOV
541
  taosWUnLockLatch(&pCtx->blkLock);
×
542

UNCOV
543
  return TSDB_CODE_SUCCESS;  
×
544
}
545

UNCOV
546
static int32_t releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) {
×
UNCOV
547
  int32_t code = TSDB_CODE_SUCCESS;
×
548
  
UNCOV
549
  blockDataDeepCleanup(pBlock);
×
UNCOV
550
  taosWLockLatch(&pCtx->blkLock);
×
UNCOV
551
  if (NULL == taosArrayPush(pCtx->pFreeBlock, &pBlock)) {
×
552
    code = terrno;
×
553
  }
UNCOV
554
  taosWUnLockLatch(&pCtx->blkLock);
×
555

UNCOV
556
  return code;
×
557
}
558

559

UNCOV
560
static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, void* pBuf, SSDataBlock** ppRes) {
×
UNCOV
561
  int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes);
×
UNCOV
562
  if (code) {
×
563
    return code;
×
564
  }
565
  //TODO OPTIMIZE PERF
UNCOV
566
  return blockDataFromBuf(*ppRes, pBuf);
×
567
}
568

UNCOV
569
static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) {
×
UNCOV
570
  SGroupCacheFileFd *pFileFd = NULL;
×
UNCOV
571
  SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pGCache->pDownstreams[pGrp->downstreamIdx].fileCtx : &pGrp->pVgCtx->fileCtx;
×
UNCOV
572
  bool deleted = false;
×
UNCOV
573
  int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd, &deleted);
×
UNCOV
574
  if (code) {
×
575
    return code;
×
576
  }
UNCOV
577
  if (deleted) {
×
578
    qError("FileId:%d-%d-%d already be deleted, skip read", pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId);
×
579
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
580
  }
581
  
UNCOV
582
  int64_t ret = taosLSeekFile(pFileFd->fd, pBasic->offset, SEEK_SET);
×
UNCOV
583
  if (ret < 0) {
×
584
    code = terrno;
×
585
    goto _return;
×
586
  }
587

UNCOV
588
  *ppBuf = taosMemoryMalloc(pBasic->bufSize);
×
UNCOV
589
  if (NULL == *ppBuf) {
×
590
    code = terrno;
×
591
    goto _return;
×
592
  }
593
  
UNCOV
594
  ret = taosReadFile(pFileFd->fd, *ppBuf, pBasic->bufSize);
×
UNCOV
595
  if (ret != pBasic->bufSize) {
×
596
    taosMemoryFreeClear(*ppBuf);
×
597
    code = terrno;
×
598
    goto _return;
×
599
  }
600

UNCOV
601
  qTrace("FileId:%d-%d-%d blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64, 
×
602
      pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset);
603

UNCOV
604
_return:
×
605

UNCOV
606
  releaseFdToFileCtx(pFileFd);
×
UNCOV
607
  return code;
×
608
}
609

UNCOV
610
static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, SGcBlkBufBasic* pBasic, SSDataBlock** ppRes) {
×
UNCOV
611
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
612
  SGcBlkCacheInfo* pCache = &pGCache->blkCache;
×
UNCOV
613
  void* pBuf = NULL;
×
614

UNCOV
615
  SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &pBasic->blkId, sizeof(pBasic->blkId));
×
UNCOV
616
  if (NULL == pBufInfo) {
×
UNCOV
617
    code = readBlockFromDisk(pGCache, pGrp, pBasic, &pBuf);
×
UNCOV
618
    if (code) {
×
619
      return code;
×
620
    }
621
  } else {
622
    pBuf = pBufInfo->pBuf;
×
623
  }
624
  
UNCOV
625
  code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes);
×
UNCOV
626
  taosHashRelease(pCache->pDirtyBlk, pBufInfo);
×
UNCOV
627
  if (NULL == pBufInfo) {
×
UNCOV
628
    taosMemoryFree(pBuf);
×
629
  }
630
  
UNCOV
631
  if (code) {
×
632
    return code;
×
633
  }
634

UNCOV
635
  QRY_ERR_RET(taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES));
×
636
  
UNCOV
637
  return TSDB_CODE_SUCCESS;
×
638
}
639

640
static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
UNCOV
641
  pVgCtx->pTbList = pTbList;
×
UNCOV
642
  pVgCtx->id = vgId;
×
UNCOV
643
  (void)snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d", 
×
UNCOV
644
     tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId);
×
UNCOV
645
  pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0;
×
646

UNCOV
647
  pVgCtx->fileCtx.baseNameLen = strlen(pVgCtx->fileCtx.baseFilename);
×
UNCOV
648
}
×
649

UNCOV
650
static int32_t addNewGroupToVgHash(SOperatorInfo* pOperator, SSHashObj* pHash, SGcNewGroupInfo* pNew) {
×
UNCOV
651
  SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx;
×
UNCOV
652
  if (NULL == pVgCtx) {
×
UNCOV
653
    SArray* pList = taosArrayInit(10, sizeof(*pNew));
×
UNCOV
654
    if (NULL == pList) {
×
655
      return terrno;
×
656
    }
UNCOV
657
    if (NULL == taosArrayPush(pList, pNew)) {
×
658
      QRY_ERR_RET(terrno);
×
659
    }
660
    
UNCOV
661
    SGcVgroupCtx vgCtx = {0};
×
UNCOV
662
    initGcVgroupCtx(pOperator, &vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList);
×
UNCOV
663
    QRY_ERR_RET(tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)));
×
664
    
UNCOV
665
    pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId));
×
UNCOV
666
    if (NULL == pNew->pGroup->pVgCtx) {
×
667
      qError("fail to get vg %d ctx from vgHash", pNew->vgId);
×
668
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
669
    }
670
    
UNCOV
671
    return TSDB_CODE_SUCCESS;
×
672
  }
673

UNCOV
674
  if (NULL == taosArrayPush(pVgCtx->pTbList, pNew)) {
×
675
    QRY_ERR_RET(terrno);
×
676
  }
677
  
UNCOV
678
  return TSDB_CODE_SUCCESS;
×
679
}
680

681
static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SOperatorParam** ppParam) {
UNCOV
682
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
683
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
684
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx];  
×
UNCOV
685
  SOperatorParam* pDst = NULL;
×
686
  
UNCOV
687
  taosWLockLatch(&pCtx->grpLock);
×
UNCOV
688
  int32_t num = taosArrayGetSize(pCtx->pNewGrpList);
×
UNCOV
689
  if (num <= 0) {
×
UNCOV
690
    goto _return;
×
691
  }
692

UNCOV
693
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
694
    SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i);
×
UNCOV
695
    if (NULL == pNew) {
×
696
      qError("fail to get vg %d SGcNewGroupInfo from pNewGrpList", i);
×
697
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
698
    }
699
    
UNCOV
700
    if (!pGCache->batchFetch) {
×
UNCOV
701
      code = addNewGroupToVgHash(pOperator, pCtx->pVgTbHash, pNew);
×
UNCOV
702
      if (code) {
×
703
        goto _return;
×
704
      }
705
    }
706

UNCOV
707
    if (NULL == pDst) {
×
UNCOV
708
      pDst = pNew->pParam;
×
709
    } else if (pNew->pParam) {
×
710
      code = mergeOperatorParams(pDst, pNew->pParam);
×
711
      if (code) {
×
712
        goto _return;
×
713
      }
714
    }
715
  }
716

UNCOV
717
  taosArrayClear(pCtx->pNewGrpList);
×
718
  
UNCOV
719
_return:
×
720

UNCOV
721
  taosWUnLockLatch(&pCtx->grpLock);
×
UNCOV
722
  *ppParam = pDst;
×
723
  
UNCOV
724
  return code;
×
725
}
726

727
static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx,
728
                                                         SSDataBlock** ppRes) {
UNCOV
729
  int32_t                  code = TSDB_CODE_SUCCESS;
×
UNCOV
730
  SOperatorParam*          pDownstreamParam = NULL;
×
UNCOV
731
  SSDataBlock*             pBlock = NULL;
×
UNCOV
732
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
733

UNCOV
734
  code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
×
UNCOV
735
  if (code) {
×
736
    return code;
×
737
  }
738

UNCOV
739
  SOperatorInfo* pDownstream = pOperator->pDownstream[downstreamIdx];
×
UNCOV
740
  if (pDownstreamParam) {
×
UNCOV
741
    code = pDownstream->fpSet.getNextExtFn(pDownstream, pDownstreamParam, &pBlock);
×
742
  } else {
UNCOV
743
    code = pDownstream->fpSet.getNextFn(pDownstream, &pBlock);
×
744
  }
745

UNCOV
746
  if (code) {
×
747
    qError("failed to get block from downstream, code:%s %s", tstrerror(code), GET_TASKID(pOperator->pTaskInfo));
×
748
    return code;
×
749
  }
750

UNCOV
751
  if (pBlock) {
×
UNCOV
752
    qDebug("%s res block retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId);
×
753

UNCOV
754
    pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
×
UNCOV
755
    if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
×
UNCOV
756
      code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
×
UNCOV
757
      if (code) {
×
758
        return code;
×
759
      }
760

UNCOV
761
      if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock,
×
UNCOV
762
                                &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) {
×
763
        QRY_ERR_RET(terrno);
×
764
      }
765
    }
766
  }
767

UNCOV
768
  code = blockDataCheck(pBlock);
×
769

UNCOV
770
  *ppRes = pBlock;
×
UNCOV
771
  return code;
×
772
}
773

UNCOV
774
static int32_t notifyWaitingSessions(SArray* pWaitQueue) {
×
UNCOV
775
  if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) {
×
UNCOV
776
    return TSDB_CODE_SUCCESS;
×
777
  }
778
  
779
  int32_t n = taosArrayGetSize(pWaitQueue);
×
780
  for (int32_t i = 0; i < n; ++i) {
×
781
    SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i);
×
782
    if (NULL == pSession) {
×
783
      qError("fail to get %d SGcSessionCtx in pWaitQueue, total:%d", i, n);
×
784
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
785
    }
786
    
787
    QRY_ERR_RET(tsem_post(&pSession->waitSem));
×
788
  }
789

790
  return TSDB_CODE_SUCCESS;
×
791
}
792

793
static FORCE_INLINE int32_t handleGroupFetchDone(SGroupCacheData* pGroup) {
UNCOV
794
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
795
  pGroup->pBlock = NULL;
×
UNCOV
796
  atomic_store_8((int8_t*)&pGroup->fetchDone, true);
×
797
  
UNCOV
798
  (void)taosThreadMutexLock(&pGroup->mutex);
×
UNCOV
799
  code = notifyWaitingSessions(pGroup->waitQueue);
×
UNCOV
800
  taosArrayClear(pGroup->waitQueue);
×
UNCOV
801
  (void)taosThreadMutexUnlock(&pGroup->mutex);
×
802

UNCOV
803
  return code;
×
804
}
805

806
static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int32_t downstreamId, int32_t vgId) {
×
807
  if (NULL == pFileCtx->pCacheFile) {
×
808
    pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
809
    if (NULL == pFileCtx->pCacheFile) {
×
810
      return terrno;
×
811
    }
812
    taosHashSetFreeFp(pFileCtx->pCacheFile, freeSGroupCacheFileInfo);
×
813
  }
814
  
815
  SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
×
816
  if (NULL == pTmp) {
×
817
    (void)snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%u", fileId);
×
818

819
    SGroupCacheFileInfo newFile = {0};
×
820
    newFile.groupNum = 1;
×
821
    QRY_ERR_RET(taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)));
×
822
    pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
×
823
    if (NULL == pTmp) {
×
824
      qError("fail to get file %d in pCacheFile", fileId);
×
825
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
826
    }
827
  } else {
828
    pTmp->groupNum++;
×
829
  }
830

831
  qTrace("FileId:%d-%d-%d add groupNum to %u", downstreamId, vgId, fileId, pTmp->groupNum);
×
832

833
  return TSDB_CODE_SUCCESS;
×
834
}
835

UNCOV
836
static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) {
×
UNCOV
837
  if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastBlkUid == uid) {
×
838
    return TSDB_CODE_SUCCESS;
×
839
  }
840
  
UNCOV
841
  pCtx->lastBlkUid = uid;
×
UNCOV
842
  pGroup->pVgCtx->lastBlkUid = uid;
×
843
  
UNCOV
844
  int32_t i = 0;
×
845
  while (true) {
×
UNCOV
846
    SGcNewGroupInfo* pNew = taosArrayGet(pGroup->pVgCtx->pTbList, i++);
×
UNCOV
847
    if (NULL == pNew || pNew->uid == uid) {
×
848
      break;
849
    }
850
    QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup));
×
851
  }
852

UNCOV
853
  groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx, pGroup->downstreamIdx, pGroup->vgId, true);
×
854

UNCOV
855
  pGroup->fileId = pGroup->pVgCtx->fileCtx.fileId;
×
UNCOV
856
  pGroup->startOffset = pGroup->pVgCtx->fileCtx.fileSize;
×
857

UNCOV
858
  qTrace("FileId:%d-%d-%d add groupNum for group %" PRIu64, pGroup->downstreamIdx, pGroup->vgId, pGroup->pVgCtx->fileCtx.fileId, uid);
×
859

UNCOV
860
  if (pGroup->needCache) {
×
861
    return addFileRefTableNum(&pGroup->pVgCtx->fileCtx, pGroup->pVgCtx->fileCtx.fileId, pGroup->downstreamIdx, pGroup->vgId);
×
862
  }
863

UNCOV
864
  return TSDB_CODE_SUCCESS;
×
865
}
866

867

868
static FORCE_INLINE int32_t initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) {
UNCOV
869
  QRY_ERR_RET(taosThreadMutexInit(&pGroup->mutex, NULL));
×
870
  
UNCOV
871
  pGroup->downstreamIdx = downstreamIdx;
×
UNCOV
872
  pGroup->vgId = vgId;
×
UNCOV
873
  pGroup->fileId = -1;
×
UNCOV
874
  pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic));
×
UNCOV
875
  if (NULL == pGroup->blkList.pList) {
×
876
    QRY_ERR_RET(terrno);
×
877
  }
UNCOV
878
  pGroup->startOffset = -1;
×
UNCOV
879
  pGroup->needCache = needCache;
×
UNCOV
880
  pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
×
881

UNCOV
882
  return TSDB_CODE_SUCCESS;
×
883
}
884

UNCOV
885
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) {
×
UNCOV
886
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
887
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
×
UNCOV
888
  SGcOperatorParam* pGcParam = pParam->value;  
×
UNCOV
889
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
×
UNCOV
890
  SGroupCacheData grpData = {0};
×
891
  
892
  while (true) {
UNCOV
893
    if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
×
894
      if (terrno == TSDB_CODE_DUP_KEY) {
×
895
        *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
×
896
        if (*ppGrp) {
×
897
          break;
×
898
        }
899
      } else {
900
        return terrno;
×
901
      }
902
    }
903

UNCOV
904
    break;
×
905
  }
906

UNCOV
907
  *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
×
UNCOV
908
  if (NULL == *ppGrp) {
×
909
    return terrno;
×
910
  }
UNCOV
911
  QRY_ERR_RET(initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache));
×
912

UNCOV
913
  qDebug("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache);
×
914

UNCOV
915
  if (pParam->pChildren) {
×
916
    SGcNewGroupInfo newGroup;
UNCOV
917
    newGroup.pGroup = *ppGrp;
×
UNCOV
918
    newGroup.vgId = vgId;
×
UNCOV
919
    newGroup.uid = uid;
×
UNCOV
920
    newGroup.pParam = taosArrayGetP(pParam->pChildren, 0);
×
921
    
UNCOV
922
    taosWLockLatch(&pCtx->grpLock);
×
UNCOV
923
    if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) {
×
924
      taosWUnLockLatch(&pCtx->grpLock);
×
925
      return terrno;
×
926
    }
UNCOV
927
    taosWUnLockLatch(&pCtx->grpLock);
×
928
    
UNCOV
929
    taosArrayDestroy(pParam->pChildren);
×
UNCOV
930
    pParam->pChildren = NULL;
×
UNCOV
931
    pCtx->fetchDone = false;
×
932
  }
933

UNCOV
934
  return TSDB_CODE_SUCCESS;
×
935
}
936

UNCOV
937
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int64_t* pIdx) {
×
UNCOV
938
  taosWLockLatch(&pGroup->blkList.lock);
×
UNCOV
939
  if (NULL == taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic)) {
×
940
    QRY_ERR_RET(terrno);
×
941
  }
UNCOV
942
  *pIdx = taosArrayGetSize(pGroup->blkList.pList) - 1;
×
UNCOV
943
  taosWUnLockLatch(&pGroup->blkList.lock);
×
944

UNCOV
945
  qDebug("block added to group cache, total block num:%" PRId64, *pIdx + 1);
×
946
  
UNCOV
947
  return TSDB_CODE_SUCCESS;
×
948
}
949

UNCOV
950
static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
×
UNCOV
951
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
952
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
953
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
×
UNCOV
954
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
×
UNCOV
955
  int64_t newBlkIdx = 0;
×
956

UNCOV
957
  SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId));
×
UNCOV
958
  if (NULL == pGroup) {
×
UNCOV
959
    if (pGCache->batchFetch) {
×
UNCOV
960
      SGcOperatorParam fakeGcParam = {0};
×
UNCOV
961
      SOperatorParam fakeParam = {0};
×
UNCOV
962
      fakeGcParam.needCache = true;
×
UNCOV
963
      fakeParam.downstreamIdx = pSession->downstreamIdx;
×
UNCOV
964
      fakeParam.value = &fakeGcParam;
×
UNCOV
965
      code = addNewGroupData(pOperator, &fakeParam, &pGroup, GROUP_CACHE_DEFAULT_VGID, pBlock->info.id.groupId);
×
UNCOV
966
      if (TSDB_CODE_SUCCESS != code) {
×
967
        return code;
×
968
      }
969
    } else {
970
      qError("group %" PRIu64 " not found in group hash", pBlock->info.id.groupId);
×
971
      return TSDB_CODE_INVALID_PARA;
×
972
    }
973
  }
974

UNCOV
975
  if (!pGCache->batchFetch) {
×
UNCOV
976
    code = handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId);
×
UNCOV
977
    if (TSDB_CODE_SUCCESS != code) {
×
978
      return code;
×
979
    }
980
  }
981

UNCOV
982
  if (pGroup->needCache) {
×
UNCOV
983
    qDebug("add block to group cache");
×
984
    
985
    SGcBlkBufInfo newBlkBuf;    
UNCOV
986
    code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf);
×
UNCOV
987
    if (code) {
×
988
      return code;
×
989
    }
990

UNCOV
991
    code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &newBlkIdx);
×
UNCOV
992
    if (code) {
×
993
      return code;
×
994
    }
995
  } else {
UNCOV
996
    qDebug("no need to add block to group cache");
×
997
    
UNCOV
998
    pGroup->pBlock = pBlock;
×
999
  }
1000

UNCOV
1001
  QRY_ERR_RET(notifyWaitingSessions(pGroup->waitQueue));
×
UNCOV
1002
  if (pGroup == pSession->pGroupData) {
×
UNCOV
1003
    if (pGroup->needCache) {
×
UNCOV
1004
      pSession->lastBlkId = newBlkIdx;
×
1005
    }
1006
    
UNCOV
1007
    *continueFetch = false;
×
1008
  }
1009

UNCOV
1010
  return TSDB_CODE_SUCCESS;
×
1011
}
1012

UNCOV
1013
static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) {
×
UNCOV
1014
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1015
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
1016
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];  
×
UNCOV
1017
  if (pGCache->batchFetch) {
×
UNCOV
1018
    SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
×
UNCOV
1019
    SGroupCacheData* pGroup = NULL;
×
UNCOV
1020
    while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) {
×
UNCOV
1021
      QRY_ERR_RET(handleGroupFetchDone(pGroup));
×
1022
    }
UNCOV
1023
    pCtx->fetchDone = true;
×
1024
  } else {
UNCOV
1025
    int32_t uidNum = 0;
×
UNCOV
1026
    SGcVgroupCtx* pVgCtx = NULL;
×
UNCOV
1027
    int32_t iter = 0;
×
UNCOV
1028
    while (NULL != (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter))) {
×
UNCOV
1029
      uidNum = taosArrayGetSize(pVgCtx->pTbList);
×
UNCOV
1030
      for (int32_t i = 0; i < uidNum; ++i) {
×
UNCOV
1031
        SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i);
×
UNCOV
1032
        QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup));
×
1033
      }
UNCOV
1034
      taosArrayClear(pVgCtx->pTbList);
×
1035
    }    
1036
  }
1037

UNCOV
1038
  taosHashClear(pCtx->pWaitSessions);
×
1039

UNCOV
1040
  return TSDB_CODE_SUCCESS;
×
1041
}
1042

UNCOV
1043
static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
×
UNCOV
1044
  bool continueFetch = true;
×
UNCOV
1045
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1046
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
1047

UNCOV
1048
  while (continueFetch && TSDB_CODE_SUCCESS == code) {
×
UNCOV
1049
    QRY_ERR_RET(getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes));
×
1050
    
UNCOV
1051
    if (NULL == *ppRes) {
×
UNCOV
1052
      QRY_ERR_RET(handleDownstreamFetchDone(pOperator, pSession));
×
UNCOV
1053
      break;
×
1054
    } else {
UNCOV
1055
      QRY_ERR_RET(handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch));
×
1056
    }
1057
  }
1058

UNCOV
1059
  if (!continueFetch) {
×
UNCOV
1060
    SGcSessionCtx** ppWaitCtx = taosHashIterate(pCtx->pWaitSessions, NULL);
×
UNCOV
1061
    if (ppWaitCtx) {
×
1062
      taosHashCancelIterate(pCtx->pWaitSessions, ppWaitCtx);
×
1063
      int64_t* pSessionId = taosHashGetKey(ppWaitCtx, NULL);
×
1064
      if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, *pSessionId)) {
×
1065
        qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
×
1066
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1067
      }
1068
      SGcSessionCtx* pWaitCtx = *ppWaitCtx;
×
1069
      pWaitCtx->newFetch = true;
×
1070
      code = taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
×
1071
      if (code) {
×
1072
        qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", *pSessionId, tstrerror(code));
×
1073
        return code;
×
1074
      }
1075
      QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem));
×
1076

1077
      return code;
×
1078
    }
1079
  }
1080

UNCOV
1081
  if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, -1)) {
×
1082
    qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
×
1083
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1084
  }
1085
  
UNCOV
1086
  return code;
×
1087
}
1088

UNCOV
1089
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes, bool* got) {
×
UNCOV
1090
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1091
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
1092
  *got = true;
×
1093

UNCOV
1094
  if (NULL != pSession->pGroupData) {
×
UNCOV
1095
    if (pSession->pGroupData->needCache) {
×
UNCOV
1096
      SGcBlkList* pBlkList = &pSession->pGroupData->blkList;
×
UNCOV
1097
      taosRLockLatch(&pBlkList->lock);
×
UNCOV
1098
      int64_t blkNum = taosArrayGetSize(pBlkList->pList);
×
UNCOV
1099
      if (pSession->lastBlkId < 0) {
×
UNCOV
1100
        if (blkNum > 0) {
×
UNCOV
1101
          SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0);
×
UNCOV
1102
          taosRUnLockLatch(&pBlkList->lock);
×
UNCOV
1103
          code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
×
UNCOV
1104
          pSession->lastBlkId = 0;
×
UNCOV
1105
          return code;
×
1106
        }
UNCOV
1107
      } else if ((pSession->lastBlkId + 1) < blkNum) {
×
1108
        SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1);
×
1109
        taosRUnLockLatch(&pBlkList->lock);
×
1110
        code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
×
1111
        pSession->lastBlkId++;
×
1112
        return code;
×
1113
      }
UNCOV
1114
      taosRUnLockLatch(&pBlkList->lock);
×
UNCOV
1115
    } else if (pSession->pGroupData->pBlock) {
×
UNCOV
1116
      *ppRes = pSession->pGroupData->pBlock;
×
UNCOV
1117
      pSession->pGroupData->pBlock = NULL;
×
UNCOV
1118
      return TSDB_CODE_SUCCESS;
×
1119
    }
1120

UNCOV
1121
    if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
×
UNCOV
1122
      *ppRes = NULL;
×
UNCOV
1123
      qDebug("sessionId: %" PRIu64 " fetch done", sessionId);
×
UNCOV
1124
      return code;
×
1125
    }
1126
  } else {
1127
    *ppRes = NULL;
×
1128
    qDebug("sessionId: %" PRIu64 " fetch done since downstream fetch done", sessionId);
×
1129
    return code;
×
1130
  }
1131

UNCOV
1132
  *got = false;
×
UNCOV
1133
  return code;
×
1134
}
1135

1136

1137
static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
×
1138
  // FOR NOW, IT'S ERROR TO REACH HERE
1139
#if 1
1140
  qError("should not enter session wait");
×
1141
  return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1142
#else
1143
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
1144
  SGroupCacheData* pGroup = pSession->pGroupData;
1145
  int32_t code = TSDB_CODE_SUCCESS;
1146
  bool inLock = true;
1147
  if (NULL == pGroup->waitQueue) {
1148
    pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES);
1149
    if (NULL == pGroup->waitQueue) {
1150
      QRY_ERR_JRET(terrno);
1151
    }
1152
  }
1153
  
1154
  if (NULL == taosArrayPush(pGroup->waitQueue, &pSession)) {
1155
    QRY_ERR_JRET(terrno);
1156
  }
1157

1158
  if (!pSession->semInit) {
1159
    QRY_ERR_JRET(tsem_init(&pSession->waitSem, 0, 0));
1160
    pSession->semInit = true;
1161
  }
1162

1163
  (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
1164
  inLock = false;
1165

1166
  QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES));
1167

1168
  code = tsem_wait(&pSession->waitSem);
1169
  if (code) {
1170
    qError("tsem_wait failed, error:%s", tstrerror(code));
1171
    QRY_ERR_JRET(code);
1172
  }
1173

1174
  if (pSession->newFetch) {
1175
    pSession->newFetch = false;
1176
    return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
1177
  }
1178

1179
  code = taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
1180
  if (code) {
1181
    qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", sessionId, tstrerror(code));
1182
    QRY_ERR_JRET(code);
1183
  }
1184

1185
  bool got = false;
1186
  return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
1187

1188
_return:
1189

1190
  if (inLock) {
1191
    (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
1192
  }
1193

1194
  return code;
1195
#endif
1196
}
1197

1198

UNCOV
1199
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
×
UNCOV
1200
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1201
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
1202
  bool locked = false;
×
UNCOV
1203
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
×
1204
  
1205
  while (true) {
UNCOV
1206
    bool got = false;
×
UNCOV
1207
    code = getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
×
UNCOV
1208
    if (TSDB_CODE_SUCCESS != code || got) {
×
UNCOV
1209
      goto _return;
×
1210
    }
1211
    
UNCOV
1212
    if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId)
×
UNCOV
1213
      || (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) {
×
UNCOV
1214
      if (locked) {
×
1215
        (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
×
1216
        locked = false;
×
1217
      }
1218
      
UNCOV
1219
      code = getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
×
UNCOV
1220
      goto _return;
×
1221
    } else {
1222
      // FOR NOW, SHOULD NOT REACH HERE
1223
      qError("Invalid fetchSessionId:%" PRId64 ", currentSessionId:%" PRId64, pCtx->fetchSessionId, sessionId);
×
1224
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1225
    }
1226

1227
    if (locked) {
1228
      code = groupCacheSessionWait(pOperator, pCtx, sessionId, pSession, ppRes);
1229
      locked = false;
1230
      if (TSDB_CODE_SUCCESS != code) {
1231
        goto _return;
1232
      }
1233
      
1234
      break;
1235
    }
1236
    
1237
    (void)taosThreadMutexLock(&pSession->pGroupData->mutex);
1238
    locked = true;
1239
  };
1240

1241

UNCOV
1242
_return:
×
1243

UNCOV
1244
  if (locked) {
×
1245
    (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
×
1246
  }
1247

UNCOV
1248
  return code;
×
1249
}
1250

1251

UNCOV
1252
static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
×
UNCOV
1253
  SGcBlkCacheInfo* pCache = &pInfo->blkCache;
×
UNCOV
1254
  pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
1255
  if (NULL == pCache->pDirtyBlk) {
×
1256
    return terrno;
×
1257
  }
UNCOV
1258
  taosHashSetFreeFp(pCache->pDirtyBlk, freeGcBlkBufInfo);
×
UNCOV
1259
  pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
1260
  if (NULL == pCache->pReadBlk) {
×
1261
    return terrno;
×
1262
  }
UNCOV
1263
  pCache->writeDownstreamId = -1;
×
1264

UNCOV
1265
  return TSDB_CODE_SUCCESS;
×
1266
}
1267

1268
static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOperatorParam* pGcParam, SGroupCacheData* pGroup) {
UNCOV
1269
  pSession->pParam = pGcParam;
×
UNCOV
1270
  pSession->downstreamIdx = pGcParam->downstreamIdx;
×
UNCOV
1271
  pSession->pGroupData = pGroup;
×
UNCOV
1272
  pSession->lastBlkId = -1;
×
UNCOV
1273
}
×
1274

UNCOV
1275
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {
×
UNCOV
1276
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1277
  SGcSessionCtx ctx = {0};
×
UNCOV
1278
  SGcOperatorParam* pGcParam = pParam->value;  
×
UNCOV
1279
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
1280
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
×
UNCOV
1281
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
×
1282

UNCOV
1283
  SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
×
UNCOV
1284
  if (NULL == pGroup && (NULL != pParam->pChildren || !pCtx->fetchDone)) {
×
UNCOV
1285
    code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? GROUP_CACHE_DEFAULT_VGID : pGcParam->vgId, pGcParam->tbUid);
×
UNCOV
1286
    if (TSDB_CODE_SUCCESS != code) {
×
1287
      return code;
×
1288
    }
1289
  }
1290

UNCOV
1291
  if (NULL == pGroup) {
×
UNCOV
1292
    return TSDB_CODE_SUCCESS;
×
1293
  }
1294

UNCOV
1295
  initGroupCacheSessionCtx(&ctx, pGcParam, pGroup);
×
1296

UNCOV
1297
  code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
×
UNCOV
1298
  if (TSDB_CODE_SUCCESS != code) {
×
1299
    return code;
×
1300
  }
1301

UNCOV
1302
  *ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
×
UNCOV
1303
  if (NULL == *ppSession) {
×
1304
    qError("fail to get session %" PRId64 " from pSessions", pGcParam->sessionId);
×
1305
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1306
  }
1307

UNCOV
1308
  qDebug("session:%" PRId64 " initialized, downstreamIdx:%d, vgId:%d, tbUid:%" PRId64 ", needCache:%d", 
×
1309
    pGcParam->sessionId, pGcParam->downstreamIdx, pGcParam->vgId, pGcParam->tbUid, pGcParam->needCache);
1310
  
UNCOV
1311
  return TSDB_CODE_SUCCESS;
×
1312
}
1313

UNCOV
1314
static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) {
×
UNCOV
1315
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1316
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
UNCOV
1317
  SGcOperatorParam* pGcParam = pParam->value;
×
UNCOV
1318
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
×
UNCOV
1319
  SGcSessionCtx* pSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
×
UNCOV
1320
  if (NULL == pSession) {
×
UNCOV
1321
    int32_t code = initGroupCacheSession(pOperator, pParam, &pSession);
×
UNCOV
1322
    if (TSDB_CODE_SUCCESS != code) {
×
1323
      return code;
×
1324
    }
UNCOV
1325
    if (NULL == pSession) {
×
UNCOV
1326
      qDebug("session %" PRId64 " in downstream %d total got 0 rows since downtream fetch done", pGcParam->sessionId, pCtx->id);
×
UNCOV
1327
      return TSDB_CODE_SUCCESS;
×
1328
    }
UNCOV
1329
  } else if (pSession->pGroupData->needCache) {
×
UNCOV
1330
    SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
×
UNCOV
1331
    if (ppBlock) {
×
UNCOV
1332
      QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock));
×
UNCOV
1333
      code = taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
×
UNCOV
1334
      if (code) {
×
1335
        qError("taosHashRemove session %" PRId64 " from pReadBlk failed, error: %s", pGcParam->sessionId, tstrerror(code));
×
1336
        QRY_ERR_RET(code);
×
1337
      }
1338
    }
1339
  }
1340
  
UNCOV
1341
  QRY_ERR_RET(getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes));
×
UNCOV
1342
  if (NULL == *ppRes) {
×
UNCOV
1343
    qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
×
UNCOV
1344
    code = taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
×
UNCOV
1345
    if (code) {
×
1346
      qError("taosHashRemove session %" PRId64 " from pSessions failed, error: %s", pGcParam->sessionId, tstrerror(code));
×
1347
      QRY_ERR_RET(code);
×
1348
    }
1349
  } else {
UNCOV
1350
    pSession->resRows += (*ppRes)->info.rows;
×
UNCOV
1351
    qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows);
×
1352
  }
1353

UNCOV
1354
  return code;
×
1355
}
1356

UNCOV
1357
static int32_t initGroupCacheExecInfo(SOperatorInfo*        pOperator) {
×
UNCOV
1358
  SGroupCacheOperatorInfo* pInfo = pOperator->info;
×
UNCOV
1359
  pInfo->execInfo.pDownstreamBlkNum = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(int64_t));
×
UNCOV
1360
  if (NULL == pInfo->execInfo.pDownstreamBlkNum) {
×
1361
    return terrno;
×
1362
  }
UNCOV
1363
  return TSDB_CODE_SUCCESS;
×
1364
}
1365

UNCOV
1366
static void freeRemoveGroupCacheData(void* p) {
×
UNCOV
1367
  SGroupCacheData* pGroup = p;
×
UNCOV
1368
  if (pGroup->vgId > 0 && pGroup->needCache) {
×
1369
    SGcFileCacheCtx* pFileCtx = &pGroup->pVgCtx->fileCtx;
×
1370
    if (pGroup->fileId >= 0) {
×
1371
      SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
×
1372
      uint32_t remainNum = atomic_sub_fetch_32(&pFileInfo->groupNum, 1);
×
1373

1374
      qTrace("FileId:%d-%d-%d sub group num to %u", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId, remainNum);
×
1375

1376
      if (0 == remainNum && pGroup->fileId != pFileCtx->fileId) {
×
1377
        removeGroupCacheFile(pFileInfo);
×
1378

1379
#if 0
1380
        /* debug only */
1381
        snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", pGroup->fileId);
1382
        taosRemoveFile(pFileCtx->baseFilename);
1383
        /* debug only */
1384
#endif
1385

1386
        qTrace("FileId:%d-%d-%d removed", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId);
×
1387
        //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
1388
      }
1389
    }
1390
  }
1391

UNCOV
1392
  taosArrayDestroy(pGroup->waitQueue);
×
UNCOV
1393
  taosArrayDestroy(pGroup->blkList.pList);
×
UNCOV
1394
  (void)taosThreadMutexDestroy(&pGroup->mutex);
×
1395

UNCOV
1396
  qTrace("group removed");
×
UNCOV
1397
}
×
1398

1399

1400

UNCOV
1401
static int32_t initGroupCacheDownstreamCtx(SOperatorInfo*          pOperator) {
×
UNCOV
1402
  SGroupCacheOperatorInfo* pInfo = pOperator->info;
×
UNCOV
1403
  pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams));
×
UNCOV
1404
  if (NULL == pInfo->pDownstreams) {
×
1405
    return terrno;
×
1406
  }
UNCOV
1407
  pInfo->downstreamNum = pOperator->numOfDownstream;
×
1408

UNCOV
1409
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
UNCOV
1410
    SGcDownstreamCtx* pCtx = &pInfo->pDownstreams[i];
×
UNCOV
1411
    pCtx->id = i;
×
UNCOV
1412
    pCtx->fetchSessionId = -1;
×
UNCOV
1413
    pCtx->lastBlkUid = 0;
×
UNCOV
1414
    pCtx->pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
1415
    if (NULL == pCtx->pVgTbHash) {
×
1416
      return terrno;
×
1417
    }
UNCOV
1418
    tSimpleHashSetFreeFp(pCtx->pVgTbHash, freeSGcVgroupCtx);      
×
1419

UNCOV
1420
    if (pInfo->batchFetch) {
×
UNCOV
1421
      int32_t defaultVg = 0;
×
UNCOV
1422
      SGcVgroupCtx vgCtx = {0};
×
UNCOV
1423
      initGcVgroupCtx(pOperator, &vgCtx, pCtx->id, defaultVg, NULL);      
×
UNCOV
1424
      QRY_ERR_RET(tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)));
×
1425
    }
1426
    
UNCOV
1427
    pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo));
×
UNCOV
1428
    if (NULL == pCtx->pNewGrpList) {
×
1429
      return terrno;
×
1430
    }
UNCOV
1431
    if (!pInfo->globalGrp) {
×
UNCOV
1432
      pCtx->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
UNCOV
1433
      if (pCtx->pGrpHash == NULL) {
×
1434
        return terrno;
×
1435
      }
UNCOV
1436
      taosHashSetFreeFp(pCtx->pGrpHash, freeRemoveGroupCacheData);      
×
1437
    }
1438

UNCOV
1439
    pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
1440
    if (pCtx->pSessions == NULL) {
×
1441
      return terrno;
×
1442
    }
UNCOV
1443
    taosHashSetFreeFp(pCtx->pSessions, freeSGcSessionCtx);
×
1444
  
UNCOV
1445
    pCtx->pFreeBlock = taosArrayInit(10, POINTER_BYTES);
×
UNCOV
1446
    if (NULL == pCtx->pFreeBlock) {
×
1447
      return terrno;
×
1448
    }
1449
    
UNCOV
1450
    pCtx->pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
1451
    if (pCtx->pWaitSessions == NULL) {
×
1452
      return terrno;
×
1453
    }
1454

UNCOV
1455
    (void)snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d", 
×
UNCOV
1456
      tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pCtx->id);
×
UNCOV
1457
    pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0;
×
UNCOV
1458
    pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename);
×
1459
  }
1460

UNCOV
1461
  return TSDB_CODE_SUCCESS;
×
1462
}
1463

UNCOV
1464
static int32_t groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
×
UNCOV
1465
  *pRes = NULL;
×
1466

UNCOV
1467
  SSDataBlock* pBlock = NULL;
×
UNCOV
1468
  int64_t      st = 0;
×
UNCOV
1469
  int32_t      code = 0;
×
1470

UNCOV
1471
  if (pOperator->cost.openCost == 0) {
×
UNCOV
1472
    st = taosGetTimestampUs();
×
1473
  }
1474

UNCOV
1475
  code = getBlkFromGroupCache(pOperator, &pBlock, pParam);
×
UNCOV
1476
  if (TSDB_CODE_SUCCESS != code) {
×
1477
    pOperator->pTaskInfo->code = code;
×
1478
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1479
  }
1480

UNCOV
1481
  if (pOperator->cost.openCost == 0) {
×
UNCOV
1482
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
×
1483
  }
1484

UNCOV
1485
  *pRes = pBlock;
×
UNCOV
1486
  return code;
×
1487
}
1488

1489
static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam* pParam) {
×
1490
  SGcNotifyOperatorParam* pGcParam = pParam->value;
×
1491
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
1492
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGcParam->downstreamIdx];
×
1493
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
×
1494

1495
  qTrace("try to remove group %" PRIu64, pGcParam->tbUid);
×
1496
  if (taosHashRemove(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid))) {
×
1497
    qError("failed to remove group %" PRIu64 " in vgId %d downstreamIdx %d", pGcParam->tbUid, pGcParam->vgId, pGcParam->downstreamIdx);
×
1498
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1499
  }
1500

1501
  return TSDB_CODE_SUCCESS;
×
1502
}
1503

UNCOV
1504
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
×
1505
                                     SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1506
                                     SOperatorInfo** pOptrInfo) {
UNCOV
1507
  QRY_PARAM_CHECK(pOptrInfo);
×
UNCOV
1508
  int32_t code = TSDB_CODE_SUCCESS;
×
1509

UNCOV
1510
  SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));
×
UNCOV
1511
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
UNCOV
1512
  if (pOperator == NULL || pInfo == NULL) {
×
1513
    code = terrno;
×
1514
    goto _error;
×
1515
  }
1516

UNCOV
1517
  pOperator->transparent = true;
×
1518
  
UNCOV
1519
  setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
×
1520

UNCOV
1521
  pInfo->maxCacheSize = 0;
×
UNCOV
1522
  pInfo->grpByUid = pPhyciNode->grpByUid;
×
UNCOV
1523
  pInfo->globalGrp = pPhyciNode->globalGrp;
×
UNCOV
1524
  pInfo->batchFetch = pPhyciNode->batchFetch;
×
1525
  
UNCOV
1526
  if (!pInfo->grpByUid) {
×
1527
    qError("only group cache by uid is supported now");
×
1528
    code = TSDB_CODE_INVALID_PARA;
×
1529
    goto _error;
×
1530
  }
1531
  
UNCOV
1532
  if (pPhyciNode->pGroupCols) {
×
1533
    code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols);
×
1534
    if (code) {
×
1535
      goto _error;
×
1536
    }
1537
  }
1538

UNCOV
1539
  code = initGroupCacheBlockCache(pInfo);
×
UNCOV
1540
  if (code) {
×
1541
    goto _error;
×
1542
  }
1543

UNCOV
1544
  if (pInfo->globalGrp) {
×
1545
    pInfo->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1546
    if (pInfo->pGrpHash == NULL) {
×
1547
      code = terrno;
×
1548
      goto _error;
×
1549
    }
1550
    taosHashSetFreeFp(pInfo->pGrpHash, freeRemoveGroupCacheData);
×
1551
  }
1552

UNCOV
1553
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
×
UNCOV
1554
  if (TSDB_CODE_SUCCESS != code) {
×
1555
    goto _error;
×
1556
  }
1557

UNCOV
1558
  code = initGroupCacheDownstreamCtx(pOperator);
×
UNCOV
1559
  if (TSDB_CODE_SUCCESS != code) {
×
1560
    goto _error;
×
1561
  }
1562

UNCOV
1563
  code = initGroupCacheExecInfo(pOperator);
×
UNCOV
1564
  if (TSDB_CODE_SUCCESS != code) {
×
1565
    goto _error;
×
1566
  }
1567

UNCOV
1568
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, groupCacheTableCacheEnd);
×
1569

UNCOV
1570
  qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch);
×
1571

UNCOV
1572
  *pOptrInfo = pOperator;
×
UNCOV
1573
  return TSDB_CODE_SUCCESS;
×
1574

1575
_error:
×
1576
  if (pInfo != NULL) {
×
1577
    destroyGroupCacheOperator(pInfo);
×
1578
  }
1579

1580
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1581
  pTaskInfo->code = code;
×
1582
  return code;
×
1583
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc