• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5071

17 May 2026 01:15AM UTC coverage: 63.054% (-10.3%) from 73.326%
#5071

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

238317 of 377957 relevant lines covered (63.05%)

130539817.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.04
/source/libs/executor/src/groupcacheoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "groupcache.h"
29

30

31
static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) {
161,267✔
32
  if (pFileInfo->fd.fd) {
161,267✔
33
    if (taosCloseFile(&pFileInfo->fd.fd) < 0) {
161,267✔
34
      qError("close group cache file failed, fd:%p, error:%s", pFileInfo->fd.fd, tstrerror(terrno));
×
35
    }
36
    pFileInfo->fd.fd = NULL;
161,267✔
37
    (void)taosThreadMutexDestroy(&pFileInfo->fd.mutex);
161,267✔
38
  }
39
  pFileInfo->deleted = true;
161,267✔
40
}
161,267✔
41

42

43
static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) {
×
44
  pCols->colNum = LIST_LENGTH(pList);
×
45
  pCols->withNull = grpColsMayBeNull;  
×
46
  pCols->pColsInfo = taosMemoryMalloc(pCols->colNum * sizeof(SGroupColInfo));
×
47
  if (NULL == pCols->pColsInfo) {
×
48
    return terrno;
×
49
  }
50

51
  int32_t i = 0;
×
52
  SNode* pNode = NULL;
×
53
  FOREACH(pNode, pList) {
×
54
    SColumnNode* pColNode = (SColumnNode*)pNode;
×
55
    pCols->pColsInfo[i].slot = pColNode->slotId;
×
56
    pCols->pColsInfo[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
×
57
    pCols->pColsInfo[i].bytes = pColNode->node.resType.bytes;
×
58
    pCols->bufSize += pColNode->node.resType.bytes;
×
59
    ++i;
×
60
  }  
61

62
  if (pCols->withNull) {
×
63
    pCols->bitMapSize = pCols->colNum / sizeof(int8_t) + ((pCols->colNum % sizeof(int8_t)) ? 1 : 0);
×
64
    pCols->bufSize += pCols->bitMapSize;
×
65
  }
66

67
  if (pCols->colNum > 1) {
×
68
    pCols->pBuf = taosMemoryMalloc(pCols->bufSize);
×
69
    if (NULL == pCols->pBuf) {
×
70
      return terrno;
×
71
    }
72
  }
73

74
  return TSDB_CODE_SUCCESS;
×
75
}
76

77
static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
1,355,564✔
78
  if (pGrpCacheOperator->downstreamNum <= 0 || NULL == pGrpCacheOperator->execInfo.pDownstreamBlkNum) {
1,355,564✔
79
    return;
×
80
  }
81

82
  int32_t bufSize = pGrpCacheOperator->downstreamNum * 32 + 100;
1,355,564✔
83
  char* buf = taosMemoryMalloc(bufSize);
1,355,564✔
84
  if (NULL == buf) {
1,355,564✔
85
    return;
×
86
  }
87
  int32_t offset = snprintf(buf, bufSize, "groupCache exec info, downstreamBlkNum:");
1,355,564✔
88
  for (int32_t i = 0; i < pGrpCacheOperator->downstreamNum; ++i) {
4,066,692✔
89
    offset += snprintf(buf + offset, bufSize - offset, " %" PRId64, pGrpCacheOperator->execInfo.pDownstreamBlkNum[i]);
2,711,128✔
90
  }
91
  qDebug("%s", buf);
1,355,564✔
92
  taosMemoryFree(buf);
1,355,564✔
93
}
94

95
static void freeSGcSessionCtx(void* p) {
8,475,464✔
96
  SGcSessionCtx* pSession = p;
8,475,464✔
97
  if (pSession->semInit) {
8,475,464✔
98
    if (tsem_destroy(&pSession->waitSem) < 0) {
×
99
      qError("tsem_destroy session waitSem failed, error:%s", tstrerror(terrno));
×
100
    }
101
  }
102
}
8,475,464✔
103

104
static void freeSGroupCacheFileInfo(void* p) {
161,267✔
105
  SGroupCacheFileInfo* pFileInfo = p;
161,267✔
106
  if (pFileInfo->deleted) {
161,267✔
107
    return;
×
108
  }
109

110
  removeGroupCacheFile(pFileInfo);
161,267✔
111
}
112

113
static void freeSGcFileCacheCtx(SGcFileCacheCtx* pFileCtx) {
5,422,256✔
114
  taosHashCleanup(pFileCtx->pCacheFile);
5,422,256✔
115
  pFileCtx->pCacheFile = NULL;
5,422,256✔
116
}
5,422,256✔
117

118
static void freeSGcVgroupCtx(void* p) {
2,711,128✔
119
  SGcVgroupCtx* pVgCtx = p;
2,711,128✔
120
  taosArrayDestroy(pVgCtx->pTbList);
2,711,128✔
121
  freeSGcFileCacheCtx(&pVgCtx->fileCtx);
2,711,128✔
122
}
2,711,128✔
123

124
static void freeGcBlockInList(void* p) {
4,135,773✔
125
  SSDataBlock** ppBlock = p;
4,135,773✔
126
  if (*ppBlock) {
4,135,773✔
127
    taosArrayDestroy((*ppBlock)->pDataBlock);
4,135,773✔
128
    taosMemoryFree(*ppBlock);
4,135,773✔
129
  }
130
}
4,135,773✔
131

132
static void freeSGcDownstreamCtx(SGcDownstreamCtx* pCtx) {
2,711,128✔
133
  taosArrayDestroy(pCtx->pNewGrpList);
2,711,128✔
134
  taosHashCleanup(pCtx->pGrpHash);
2,711,128✔
135
  tSimpleHashCleanup(pCtx->pVgTbHash);
2,711,128✔
136

137
  taosArrayDestroyEx(pCtx->pFreeBlock, freeGcBlockInList);
2,711,128✔
138
  taosHashCleanup(pCtx->pSessions);
2,711,128✔
139
  taosHashCleanup(pCtx->pWaitSessions);
2,711,128✔
140
  freeSGcFileCacheCtx(&pCtx->fileCtx);
2,711,128✔
141
}
2,711,128✔
142

143
static void destroyGroupCacheDownstreamCtx(SGroupCacheOperatorInfo* pGrpCacheOperator) {
1,355,564✔
144
  if (NULL == pGrpCacheOperator->pDownstreams) {
1,355,564✔
145
    return;
×
146
  }
147
  
148
  for (int32_t i = 0; i < pGrpCacheOperator->downstreamNum; ++i) {
4,066,692✔
149
    SGcDownstreamCtx* pCtx = &pGrpCacheOperator->pDownstreams[i];
2,711,128✔
150
    freeSGcDownstreamCtx(pCtx);
2,711,128✔
151
  }
152

153
  taosMemoryFree(pGrpCacheOperator->pDownstreams);
1,355,564✔
154
}
155

156

157
void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
8,276,996✔
158
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
8,276,996✔
159
  for (int32_t i = 0; i < numOfCols; ++i) {
31,513,224✔
160
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
23,236,228✔
161
    if (NULL == p) {
23,236,228✔
162
      qError("fail to get %dth col in dataBlock, numOfCols:%d", i, (int32_t)numOfCols);
×
163
      continue;
×
164
    }
165
    taosMemoryFreeClear(p->pData);
23,236,228✔
166
    if (IS_VAR_DATA_TYPE(p->info.type)) {
23,236,228✔
167
      taosMemoryFreeClear(p->varmeta.offset);
216,638✔
168
      p->varmeta.length = 0;
216,638✔
169
      p->varmeta.allocLen = 0;
216,638✔
170
    } else {
171
      taosMemoryFreeClear(p->nullbitmap);
23,019,590✔
172
    }
173
  }
174
  pDataBlock->info.capacity = 0;
8,276,996✔
175
  pDataBlock->info.rows = 0;
8,276,996✔
176
}
8,276,996✔
177

178

179

180
static void destroySGcBlkCacheInfo(SGcBlkCacheInfo* pBlkCache) {
1,355,564✔
181
  taosHashCleanup(pBlkCache->pDirtyBlk);
1,355,564✔
182

183
  void* p = NULL;
1,355,564✔
184
  while (NULL != (p = taosHashIterate(pBlkCache->pReadBlk, p))) {
5,342,625✔
185
    blockDataDeepCleanup(*(SSDataBlock**)p);
3,987,061✔
186
    freeGcBlockInList(p);
3,987,061✔
187
  }
188

189
  taosHashCleanup(pBlkCache->pReadBlk);
1,355,564✔
190
}
1,355,564✔
191

192
static void destroyGroupCacheOperator(void* param) {
1,355,564✔
193
  SGroupCacheOperatorInfo* pGrpCacheOperator = (SGroupCacheOperatorInfo*)param;
1,355,564✔
194

195
  logGroupCacheExecInfo(pGrpCacheOperator);
1,355,564✔
196
  
197
  taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
1,355,564✔
198
  taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
1,355,564✔
199

200
  destroyGroupCacheDownstreamCtx(pGrpCacheOperator);
1,355,564✔
201
  destroySGcBlkCacheInfo(&pGrpCacheOperator->blkCache);
1,355,564✔
202
  taosHashCleanup(pGrpCacheOperator->pGrpHash);
1,355,564✔
203

204
  taosMemoryFree(pGrpCacheOperator->execInfo.pDownstreamBlkNum);
1,355,564✔
205
  
206
  taosMemoryFreeClear(param);
1,355,564✔
207
}
1,355,564✔
208

209
static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) {
210
  TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
161,267✔
211
  //TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE);
212
  if (NULL == newFd) {
161,267✔
213
    QRY_ERR_RET(terrno);
×
214
  }
215
  pFileFd->fd = newFd;
161,267✔
216
  int32_t code = taosThreadMutexInit(&pFileFd->mutex, NULL);
161,267✔
217
  if (code) {
161,267✔
218
    qError("taosThreadMutexInit failed, code:%x", code);
×
219
    QRY_ERR_RET(code);
×
220
  }
221

222
  qTrace("file path %s created", filename);
161,267✔
223
  
224
  return TSDB_CODE_SUCCESS;
161,267✔
225
}
226

227
static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, SGroupCacheFileFd** ppFd, bool* pDeleted) {
8,922,347✔
228
  int32_t code = TSDB_CODE_SUCCESS;
8,922,347✔
229
  if (NULL == pFileCtx->pCacheFile) {
8,922,347✔
230
    pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
161,267✔
231
    if (NULL == pFileCtx->pCacheFile) {
161,267✔
232
      return terrno;
×
233
    }
234
    taosHashSetFreeFp(pFileCtx->pCacheFile, freeSGroupCacheFileInfo);
161,267✔
235
  }
236
  
237
  SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
8,922,347✔
238
  if (NULL == pTmp) {
8,922,347✔
239
    (void)snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", fileId);
161,267✔
240

241
    SGroupCacheFileInfo newFile = {0};
161,267✔
242
    if (taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile))) {
161,267✔
243
      QRY_ERR_RET(terrno);
×
244
    }
245
    pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
161,267✔
246
    if (NULL == pTmp) {
161,267✔
247
      qError("fail to get file %d from pCacheFile", fileId);
×
248
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
249
    }
250
  }
251

252
  if (pTmp->deleted) {
8,922,347✔
253
    *pDeleted = true;
×
254
    return TSDB_CODE_SUCCESS;
×
255
  }
256

257
  if (NULL == pTmp->fd.fd) {
8,922,347✔
258
    code = initOpenCacheFile(&pTmp->fd, pFileCtx->baseFilename);
161,267✔
259
    if (code) {
161,267✔
260
      return code;
×
261
    }
262
  }
263

264
  (void)taosThreadMutexLock(&pTmp->fd.mutex);
8,922,347✔
265
  *ppFd = &pTmp->fd;
8,922,347✔
266
  
267
  return TSDB_CODE_SUCCESS;
8,922,347✔
268
}
269

270
static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
271
  if (NULL == pFd) {
8,922,347✔
272
    return;
×
273
  }
274
  (void)taosThreadMutexUnlock(&pFd->mutex);
8,922,347✔
275
}
276

277
static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
645,351✔
278
  int32_t code = TSDB_CODE_SUCCESS;
645,351✔
279
  SGroupCacheFileFd *pFd = NULL;
645,351✔
280
  SGcFileCacheCtx* pFileCtx = NULL;
645,351✔
281
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
645,351✔
282
  int64_t lastGroupId = 0;
645,351✔
283
  SGroupCacheData* pGroup = NULL;
645,351✔
284
  
285
  while (NULL != pHead) {
1,290,702✔
286
    pFd = NULL;
645,351✔
287
    
288
    if (pGCache->batchFetch) {
645,351✔
289
      pFileCtx = &pHead->pCtx->fileCtx;
645,351✔
290
    } else {
291
      if (pHead->groupId != lastGroupId) {
×
292
        if (NULL != pGroup) {
×
293
          taosHashRelease(pGrpHash, pGroup);
×
294
        }
295
        pGroup = taosHashAcquire(pGrpHash, &pHead->groupId, sizeof(pHead->groupId));      
×
296
        lastGroupId = pHead->groupId;
×
297
      }
298
    
299
      if (NULL == pGroup) {
×
300
        qTrace("group %" PRIu64 " in downstream %d may already be deleted, skip write", pHead->groupId, pHead->pCtx->id);
×
301

302
        int64_t blkId = pHead->basic.blkId;
×
303
        pHead = pHead->next;
×
304
        code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
×
305
        if (code) {
×
306
          qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
307
          goto _return;
×
308
        }
309
        continue;
×
310
      }
311
      
312
      pFileCtx = &pGroup->pVgCtx->fileCtx;
×
313
    }
314

315
    bool deleted = false;
645,351✔
316
    code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd, &deleted);
645,351✔
317
    if (code) {
645,351✔
318
      goto _return;
×
319
    }
320

321
    if (deleted) {
645,351✔
322
      releaseFdToFileCtx(pFd);
×
323

324
      qTrace("FileId:%d-%d-%d already be deleted, skip write", 
×
325
          pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);
326
      
327
      int64_t blkId = pHead->basic.blkId;
×
328
      pHead = pHead->next;
×
329
      
330
      code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
×
331
      if (code) {
×
332
        qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
333
        goto _return;
×
334
      }
335
      continue;
×
336
    }
337
    
338
    int64_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
645,351✔
339
    if (ret < 0) {
645,351✔
340
      releaseFdToFileCtx(pFd);
×
341
      code = terrno;
×
342
      goto _return;
×
343
    }
344
    
345
    ret = taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
645,351✔
346
    if (ret != pHead->basic.bufSize) {
645,351✔
347
      releaseFdToFileCtx(pFd);
×
348
      code = terrno;
×
349
      goto _return;
×
350
    }
351
    
352
    releaseFdToFileCtx(pFd);
645,351✔
353

354
    qTrace("FileId:%d-%d-%d blk %" PRIu64 " in group %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, 
645,351✔
355
        pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset);
356
    
357
    int64_t blkId = pHead->basic.blkId;
645,351✔
358
    pHead = pHead->next;
645,351✔
359

360
    code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
645,351✔
361
    if (code) {
645,351✔
362
      qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
×
363
      goto _return;
×
364
    }
365
  }
366

367
_return:
645,351✔
368

369
  if (NULL != pGroup) {
645,351✔
370
    taosHashRelease(pGrpHash, pGroup);
×
371
  }
372

373
  (void)atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1);
645,351✔
374

375
  return code;
645,351✔
376
}
377

378

379
void freeGcBlkBufInfo(void* ptr) {
645,351✔
380
  SGcBlkBufInfo* pBlk = (SGcBlkBufInfo*)ptr;
645,351✔
381
  taosMemoryFreeClear(pBlk->pBuf);
645,351✔
382
}
645,351✔
383

384

385
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) {
645,351✔
386
  if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId), pBufInfo, sizeof(*pBufInfo))) {
645,351✔
387
    freeGcBlkBufInfo(pBufInfo);
×
388
    return terrno;
×
389
  }
390
  pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));
645,351✔
391
  if (NULL == pBufInfo) {
645,351✔
392
    qError("fail to get blk %" PRId64 " from pCache->pDirtyBlk", pBufInfo->basic.blkId);
×
393
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
394
  }
395
  int32_t code = TSDB_CODE_SUCCESS;
645,351✔
396
  SGcBlkBufInfo* pWriteHead = NULL;
645,351✔
397
  
398
  taosWLockLatch(&pCache->dirtyLock);
645,351✔
399
  pCache->blkCacheSize += pBufInfo->basic.bufSize;
645,351✔
400
  qDebug("group cache total dirty block num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize);
645,351✔
401

402
  if (NULL == pCache->pDirtyHead) {
645,351✔
403
    pCache->pDirtyHead = pBufInfo;
645,351✔
404
  } else {
405
    pCache->pDirtyTail->next = pBufInfo;
×
406
  }
407
  pCache->pDirtyTail = pBufInfo;
645,351✔
408
    
409
  if (pGCache->maxCacheSize >= 0 && pCache->blkCacheSize > pGCache->maxCacheSize) {
645,351✔
410
    if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) {
645,351✔
411
      pWriteHead = pCache->pDirtyHead;
645,351✔
412
      SGcBlkBufInfo* pTmp = pCache->pDirtyHead;
645,351✔
413
      while (NULL != pTmp) {
645,351✔
414
        pCache->blkCacheSize -= pTmp->basic.bufSize;
645,351✔
415
        if (pCache->blkCacheSize <= pGCache->maxCacheSize) {
645,351✔
416
          pCache->pDirtyHead = pTmp->next;
645,351✔
417
          pTmp->next = NULL;
645,351✔
418
          break;
645,351✔
419
        }
420
        pTmp = pTmp->next;
×
421
      }
422
    }
423
  }
424
  taosWUnLockLatch(&pCache->dirtyLock);
645,351✔
425

426
  if (NULL != pWriteHead) {
645,351✔
427
    code = saveBlocksToDisk(pGCache, pCtx, pWriteHead);
645,351✔
428
  }
429

430
  return code;
645,351✔
431
}
432

433
static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) {
434
  SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
×
435
  if (NULL == pFileInfo) {
×
436
    return;
×
437
  }
438
  
439
  if (0 == pFileInfo->groupNum) {
×
440
    removeGroupCacheFile(pFileInfo);
×
441

442
#if 0  
443
    /* debug only */
444
    snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", pFileCtx->fileId);
445
    taosRemoveFile(pFileCtx->baseFilename);
446
    /* debug only */
447
#endif
448

449
    qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId);
×
450
    //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
451
  }
452
}
453

454
static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId, bool removeCheck) {
455
  if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
650,487✔
456
    return;
650,487✔
457
  }
458

459
  if (removeCheck) {
×
460
    chkRemoveVgroupCurrFile(pFileCtx, downstreamIdx, vgId);
461
  }
462
      
463
  pFileCtx->fileId++;
×
464
  pFileCtx->fileSize = 0;
×
465
}
466

467

468
static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) {
645,351✔
469
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
645,351✔
470
  int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t);
645,351✔
471
  pBufInfo->pBuf = taosMemoryMalloc(bufSize);
645,351✔
472
  if (NULL == pBufInfo->pBuf) {
645,351✔
473
    qError("group cache add block to cache failed, size:%" PRId64, bufSize);
×
474
    return terrno;
×
475
  }
476
  QRY_ERR_RET(blockDataToBuf(pBufInfo->pBuf, pBlock));
645,351✔
477

478
  SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pCtx->fileCtx : &pGroup->pVgCtx->fileCtx;
645,351✔
479

480
  pBufInfo->next = NULL;
645,351✔
481
  pBufInfo->basic.blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1);
645,351✔
482
  pBufInfo->basic.fileId = pGCache->batchFetch ? pFileCtx->fileId : pGroup->fileId;
645,351✔
483
  pBufInfo->basic.bufSize = bufSize;
645,351✔
484
  pBufInfo->basic.offset = atomic_fetch_add_64(&pFileCtx->fileSize, bufSize);
645,351✔
485
  pBufInfo->pCtx = pCtx;
645,351✔
486
  pBufInfo->groupId = pBlock->info.id.groupId;
645,351✔
487

488
  if (pGCache->batchFetch) {    
645,351✔
489
    groupCacheSwitchNewFile(pFileCtx, pCtx->id, pGroup->vgId, false);
645,351✔
490
  }
491

492
  int32_t code = addBlkToDirtyBufList(pGCache, pCtx, &pGCache->blkCache, pBufInfo);
645,351✔
493

494
  return code;
645,351✔
495
}
496

497
void blockDataDeepClear(SSDataBlock* pDataBlock) {
4,135,773✔
498
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
4,135,773✔
499
  for (int32_t i = 0; i < numOfCols; ++i) {
15,647,384✔
500
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
11,511,611✔
501
    if (NULL == p) {
11,511,611✔
502
      qError("fail to get %d col from pDataBlock, numOfCols:%d", i, (int32_t)numOfCols);
×
503
      continue;
×
504
    }
505
    p->pData = NULL;
11,511,611✔
506
    if (IS_VAR_DATA_TYPE(p->info.type)) {
11,511,611✔
507
      p->varmeta.offset = NULL;
72,141✔
508
      p->varmeta.length = 0;
72,141✔
509
      p->varmeta.allocLen = 0;
72,141✔
510
    } else {
511
      p->nullbitmap = NULL;
11,439,470✔
512
    }
513
  }
514
  pDataBlock->info.capacity = 0;
4,135,773✔
515
  pDataBlock->info.rows = 0;
4,135,773✔
516
}
4,135,773✔
517

518
static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) {
4,135,773✔
519
  *ppDst = taosMemoryMalloc(sizeof(*pSrc));
4,135,773✔
520
  if (NULL == *ppDst) {
4,135,773✔
521
    return terrno;
×
522
  }
523
  (*ppDst)->pBlockAgg = NULL;
4,135,773✔
524
  (*ppDst)->pDataBlock = taosArrayDup(pSrc->pDataBlock, NULL);
4,135,773✔
525
  if (NULL == (*ppDst)->pDataBlock) {
4,135,773✔
526
    taosMemoryFree(*ppDst);
×
527
    return terrno;
×
528
  }
529
  TAOS_MEMCPY(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info));
4,135,773✔
530
  blockDataDeepClear(*ppDst);
4,135,773✔
531
  
532
  return TSDB_CODE_SUCCESS;
4,135,773✔
533
}
534

535
static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** ppRes) {
8,276,996✔
536
  taosWLockLatch(&pCtx->blkLock);
8,276,996✔
537
  if (taosArrayGetSize(pCtx->pFreeBlock) <= 0) {
8,276,996✔
538
    taosWUnLockLatch(&pCtx->blkLock);
3,971,938✔
539
    return buildGroupCacheBaseBlock(ppRes, pCtx->pBaseBlock);
3,971,938✔
540
  }
541
  *ppRes = *(SSDataBlock**)taosArrayPop(pCtx->pFreeBlock);
4,305,058✔
542
  taosWUnLockLatch(&pCtx->blkLock);
4,305,058✔
543

544
  return TSDB_CODE_SUCCESS;  
4,305,058✔
545
}
546

547
static int32_t releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) {
4,289,935✔
548
  int32_t code = TSDB_CODE_SUCCESS;
4,289,935✔
549
  
550
  blockDataDeepCleanup(pBlock);
4,289,935✔
551
  taosWLockLatch(&pCtx->blkLock);
4,289,935✔
552
  if (NULL == taosArrayPush(pCtx->pFreeBlock, &pBlock)) {
8,579,870✔
553
    code = terrno;
×
554
  }
555
  taosWUnLockLatch(&pCtx->blkLock);
4,289,935✔
556

557
  return code;
4,289,935✔
558
}
559

560

561
static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, void* pBuf, SSDataBlock** ppRes) {
8,276,996✔
562
  int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes);
8,276,996✔
563
  if (code) {
8,276,996✔
564
    return code;
×
565
  }
566
  //TODO OPTIMIZE PERF
567
  return blockDataFromBuf(*ppRes, pBuf);
8,276,996✔
568
}
569

570
static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) {
8,276,996✔
571
  SGroupCacheFileFd *pFileFd = NULL;
8,276,996✔
572
  SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pGCache->pDownstreams[pGrp->downstreamIdx].fileCtx : &pGrp->pVgCtx->fileCtx;
8,276,996✔
573
  bool deleted = false;
8,276,996✔
574
  int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd, &deleted);
8,276,996✔
575
  if (code) {
8,276,996✔
576
    return code;
×
577
  }
578
  if (deleted) {
8,276,996✔
579
    qError("FileId:%d-%d-%d already be deleted, skip read", pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId);
×
580
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
581
  }
582
  
583
  int64_t ret = taosLSeekFile(pFileFd->fd, pBasic->offset, SEEK_SET);
8,276,996✔
584
  if (ret < 0) {
8,276,996✔
585
    code = terrno;
×
586
    goto _return;
×
587
  }
588

589
  *ppBuf = taosMemoryMalloc(pBasic->bufSize);
8,276,996✔
590
  if (NULL == *ppBuf) {
8,276,996✔
591
    code = terrno;
×
592
    goto _return;
×
593
  }
594
  
595
  ret = taosReadFile(pFileFd->fd, *ppBuf, pBasic->bufSize);
8,276,996✔
596
  if (ret != pBasic->bufSize) {
8,276,996✔
597
    taosMemoryFreeClear(*ppBuf);
×
598
    code = terrno;
×
599
    goto _return;
×
600
  }
601

602
  qTrace("FileId:%d-%d-%d blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64, 
8,276,996✔
603
      pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset);
604

605
_return:
8,276,996✔
606

607
  releaseFdToFileCtx(pFileFd);
8,276,996✔
608
  return code;
8,276,996✔
609
}
610

611
static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, SGcBlkBufBasic* pBasic, SSDataBlock** ppRes) {
8,276,996✔
612
  int32_t code = TSDB_CODE_SUCCESS;
8,276,996✔
613
  SGcBlkCacheInfo* pCache = &pGCache->blkCache;
8,276,996✔
614
  void* pBuf = NULL;
8,276,996✔
615

616
  SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &pBasic->blkId, sizeof(pBasic->blkId));
8,276,996✔
617
  if (NULL == pBufInfo) {
8,276,996✔
618
    code = readBlockFromDisk(pGCache, pGrp, pBasic, &pBuf);
8,276,996✔
619
    if (code) {
8,276,996✔
620
      return code;
×
621
    }
622
  } else {
623
    pBuf = pBufInfo->pBuf;
×
624
  }
625
  
626
  code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes);
8,276,996✔
627
  taosHashRelease(pCache->pDirtyBlk, pBufInfo);
8,276,996✔
628
  if (NULL == pBufInfo) {
8,276,996✔
629
    taosMemoryFree(pBuf);
8,276,996✔
630
  }
631
  
632
  if (code) {
8,276,996✔
633
    return code;
×
634
  }
635

636
  QRY_ERR_RET(taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES));
8,276,996✔
637
  
638
  return TSDB_CODE_SUCCESS;
8,276,996✔
639
}
640

641
static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
642
  pVgCtx->pTbList = pTbList;
2,711,128✔
643
  pVgCtx->id = vgId;
2,711,128✔
644
  (void)snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%p_%d_%d", 
5,422,256✔
645
     tsTempDir, taosGetPId(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pOperator, downstreamId, vgId);
5,422,256✔
646
  pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0;
2,711,128✔
647

648
  pVgCtx->fileCtx.baseNameLen = strlen(pVgCtx->fileCtx.baseFilename);
2,711,128✔
649
}
2,711,128✔
650

651
static int32_t addNewGroupToVgHash(SOperatorInfo* pOperator, SSHashObj* pHash, SGcNewGroupInfo* pNew) {
5,136✔
652
  SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx;
5,136✔
653
  if (NULL == pVgCtx) {
5,136✔
654
    SArray* pList = taosArrayInit(10, sizeof(*pNew));
2,568✔
655
    if (NULL == pList) {
2,568✔
656
      return terrno;
×
657
    }
658
    if (NULL == taosArrayPush(pList, pNew)) {
2,568✔
659
      QRY_ERR_RET(terrno);
×
660
    }
661
    
662
    SGcVgroupCtx vgCtx = {0};
2,568✔
663
    initGcVgroupCtx(pOperator, &vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList);
2,568✔
664
    QRY_ERR_RET(tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)));
2,568✔
665
    
666
    pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId));
2,568✔
667
    if (NULL == pNew->pGroup->pVgCtx) {
2,568✔
668
      qError("fail to get vg %d ctx from vgHash", pNew->vgId);
×
669
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
670
    }
671
    
672
    return TSDB_CODE_SUCCESS;
2,568✔
673
  }
674

675
  if (NULL == taosArrayPush(pVgCtx->pTbList, pNew)) {
5,136✔
676
    QRY_ERR_RET(terrno);
×
677
  }
678
  
679
  return TSDB_CODE_SUCCESS;
2,568✔
680
}
681

682
static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SOperatorParam** ppParam) {
683
  int32_t code = TSDB_CODE_SUCCESS;
807,295✔
684
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
807,295✔
685
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx];  
807,295✔
686
  SOperatorParam* pDst = NULL;
807,295✔
687
  
688
  taosWLockLatch(&pCtx->grpLock);
807,295✔
689
  int32_t num = taosArrayGetSize(pCtx->pNewGrpList);
807,295✔
690
  if (num <= 0) {
807,295✔
691
    goto _return;
620,695✔
692
  }
693

694
  for (int32_t i = 0; i < num; ++i) {
373,200✔
695
    SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i);
186,600✔
696
    if (NULL == pNew) {
186,600✔
697
      qError("fail to get vg %d SGcNewGroupInfo from pNewGrpList", i);
×
698
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
699
    }
700
    
701
    if (!pGCache->batchFetch) {
186,600✔
702
      code = addNewGroupToVgHash(pOperator, pCtx->pVgTbHash, pNew);
5,136✔
703
      if (code) {
5,136✔
704
        goto _return;
×
705
      }
706
    }
707

708
    if (NULL == pDst) {
186,600✔
709
      pDst = pNew->pParam;
186,600✔
710
    } else if (pNew->pParam) {
×
711
      code = mergeOperatorParams(pDst, pNew->pParam);
×
712
      if (code) {
×
713
        goto _return;
×
714
      }
715
    }
716
  }
717

718
  taosArrayClear(pCtx->pNewGrpList);
186,600✔
719
  
720
_return:
807,295✔
721

722
  taosWUnLockLatch(&pCtx->grpLock);
807,295✔
723
  *ppParam = pDst;
807,295✔
724
  
725
  return code;
807,295✔
726
}
727

728
static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx,
729
                                                         SSDataBlock** ppRes) {
730
  int32_t                  code = TSDB_CODE_SUCCESS;
807,295✔
731
  SOperatorParam*          pDownstreamParam = NULL;
1,614,590✔
732
  SSDataBlock*             pBlock = NULL;
807,295✔
733
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
807,295✔
734

735
  code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
807,295✔
736
  if (code) {
807,295✔
737
    return code;
×
738
  }
739

740
  SOperatorInfo* pDownstream = pOperator->pDownstream[downstreamIdx];
807,295✔
741
  if (pDownstreamParam) {
807,295✔
742
    code = pDownstream->fpSet.getNextExtFn(pDownstream, pDownstreamParam, &pBlock);
186,600✔
743
  } else {
744
    code = pDownstream->fpSet.getNextFn(pDownstream, &pBlock);
620,695✔
745
  }
746

747
  if (code) {
807,295✔
748
    qError("failed to get block from downstream, code:%s %s", tstrerror(code), GET_TASKID(pOperator->pTaskInfo));
×
749
    return code;
×
750
  }
751

752
  if (pBlock) {
807,295✔
753
    qDebug("%s res block retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId);
650,487✔
754

755
    pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
650,487✔
756
    if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
650,487✔
757
      code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
163,835✔
758
      if (code) {
163,835✔
759
        return code;
×
760
      }
761

762
      if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock,
327,670✔
763
                                &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) {
163,835✔
764
        QRY_ERR_RET(terrno);
×
765
      }
766
    }
767
  }
768

769
  code = blockDataCheck(pBlock);
807,295✔
770

771
  *ppRes = pBlock;
807,295✔
772
  return code;
807,295✔
773
}
774

775
static int32_t notifyWaitingSessions(SArray* pWaitQueue) {
1,255,569✔
776
  if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) {
1,255,569✔
777
    return TSDB_CODE_SUCCESS;
1,255,569✔
778
  }
779
  
780
  int32_t n = taosArrayGetSize(pWaitQueue);
×
781
  for (int32_t i = 0; i < n; ++i) {
×
782
    SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i);
×
783
    if (NULL == pSession) {
×
784
      qError("fail to get %d SGcSessionCtx in pWaitQueue, total:%d", i, n);
×
785
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
786
    }
787
    
788
    QRY_ERR_RET(tsem_post(&pSession->waitSem));
×
789
  }
790

791
  return TSDB_CODE_SUCCESS;
×
792
}
793

794
static FORCE_INLINE int32_t handleGroupFetchDone(SGroupCacheData* pGroup) {
795
  int32_t code = TSDB_CODE_SUCCESS;
605,082✔
796
  pGroup->pBlock = NULL;
605,082✔
797
  atomic_store_8((int8_t*)&pGroup->fetchDone, true);
605,082✔
798
  
799
  (void)taosThreadMutexLock(&pGroup->mutex);
605,082✔
800
  code = notifyWaitingSessions(pGroup->waitQueue);
605,082✔
801
  taosArrayClear(pGroup->waitQueue);
605,082✔
802
  (void)taosThreadMutexUnlock(&pGroup->mutex);
605,082✔
803

804
  return code;
605,082✔
805
}
806

807
static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int32_t downstreamId, int32_t vgId) {
×
808
  if (NULL == pFileCtx->pCacheFile) {
×
809
    pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
810
    if (NULL == pFileCtx->pCacheFile) {
×
811
      return terrno;
×
812
    }
813
    taosHashSetFreeFp(pFileCtx->pCacheFile, freeSGroupCacheFileInfo);
×
814
  }
815
  
816
  SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
×
817
  if (NULL == pTmp) {
×
818
    (void)snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%u", fileId);
×
819

820
    SGroupCacheFileInfo newFile = {0};
×
821
    newFile.groupNum = 1;
×
822
    QRY_ERR_RET(taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)));
×
823
    pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
×
824
    if (NULL == pTmp) {
×
825
      qError("fail to get file %d in pCacheFile", fileId);
×
826
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
827
    }
828
  } else {
829
    pTmp->groupNum++;
×
830
  }
831

832
  qTrace("FileId:%d-%d-%d add groupNum to %u", downstreamId, vgId, fileId, pTmp->groupNum);
×
833

834
  return TSDB_CODE_SUCCESS;
×
835
}
836

837
static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) {
5,136✔
838
  if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastBlkUid == uid) {
5,136✔
839
    return TSDB_CODE_SUCCESS;
×
840
  }
841
  
842
  pCtx->lastBlkUid = uid;
5,136✔
843
  pGroup->pVgCtx->lastBlkUid = uid;
5,136✔
844
  
845
  int32_t i = 0;
5,136✔
846
  while (true) {
×
847
    SGcNewGroupInfo* pNew = taosArrayGet(pGroup->pVgCtx->pTbList, i++);
5,136✔
848
    if (NULL == pNew || pNew->uid == uid) {
5,136✔
849
      break;
850
    }
851
    QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup));
×
852
  }
853

854
  groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx, pGroup->downstreamIdx, pGroup->vgId, true);
5,136✔
855

856
  pGroup->fileId = pGroup->pVgCtx->fileCtx.fileId;
5,136✔
857
  pGroup->startOffset = pGroup->pVgCtx->fileCtx.fileSize;
5,136✔
858

859
  qTrace("FileId:%d-%d-%d add groupNum for group %" PRIu64, pGroup->downstreamIdx, pGroup->vgId, pGroup->pVgCtx->fileCtx.fileId, uid);
5,136✔
860

861
  if (pGroup->needCache) {
5,136✔
862
    return addFileRefTableNum(&pGroup->pVgCtx->fileCtx, pGroup->pVgCtx->fileCtx.fileId, pGroup->downstreamIdx, pGroup->vgId);
×
863
  }
864

865
  return TSDB_CODE_SUCCESS;
5,136✔
866
}
867

868

869
static FORCE_INLINE int32_t initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) {
870
  QRY_ERR_RET(taosThreadMutexInit(&pGroup->mutex, NULL));
672,263✔
871
  
872
  pGroup->downstreamIdx = downstreamIdx;
672,263✔
873
  pGroup->vgId = vgId;
672,263✔
874
  pGroup->fileId = -1;
672,263✔
875
  pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic));
672,263✔
876
  if (NULL == pGroup->blkList.pList) {
672,263✔
877
    QRY_ERR_RET(terrno);
×
878
  }
879
  pGroup->startOffset = -1;
672,263✔
880
  pGroup->needCache = needCache;
672,263✔
881
  pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
672,263✔
882

883
  return TSDB_CODE_SUCCESS;
672,263✔
884
}
885

886
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) {
672,263✔
887
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
672,263✔
888
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
672,263✔
889
  SGcOperatorParam* pGcParam = pParam->value;  
672,263✔
890
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
672,263✔
891
  SGroupCacheData grpData = {0};
672,263✔
892
  
893
  while (true) {
894
    if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
672,263✔
895
      if (terrno == TSDB_CODE_DUP_KEY) {
×
896
        *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
×
897
        if (*ppGrp) {
×
898
          break;
×
899
        }
900
      } else {
901
        return terrno;
×
902
      }
903
    }
904

905
    break;
672,263✔
906
  }
907

908
  *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
672,263✔
909
  if (NULL == *ppGrp) {
672,263✔
910
    return terrno;
×
911
  }
912
  QRY_ERR_RET(initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache));
1,344,526✔
913

914
  qDebug("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache);
672,263✔
915

916
  if (pParam->pChildren) {
672,263✔
917
    SGcNewGroupInfo newGroup;
186,600✔
918
    newGroup.pGroup = *ppGrp;
186,600✔
919
    newGroup.vgId = vgId;
186,600✔
920
    newGroup.uid = uid;
186,600✔
921
    newGroup.pParam = taosArrayGetP(pParam->pChildren, 0);
186,600✔
922
    
923
    taosWLockLatch(&pCtx->grpLock);
186,600✔
924
    if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) {
373,200✔
925
      taosWUnLockLatch(&pCtx->grpLock);
×
926
      return terrno;
×
927
    }
928
    taosWUnLockLatch(&pCtx->grpLock);
186,600✔
929
    
930
    taosArrayDestroy(pParam->pChildren);
186,600✔
931
    pParam->pChildren = NULL;
186,600✔
932
    pCtx->fetchDone = false;
186,600✔
933
  }
934

935
  return TSDB_CODE_SUCCESS;
672,263✔
936
}
937

938
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int64_t* pIdx) {
645,351✔
939
  taosWLockLatch(&pGroup->blkList.lock);
645,351✔
940
  if (NULL == taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic)) {
1,290,702✔
941
    QRY_ERR_RET(terrno);
×
942
  }
943
  *pIdx = taosArrayGetSize(pGroup->blkList.pList) - 1;
645,351✔
944
  taosWUnLockLatch(&pGroup->blkList.lock);
645,351✔
945

946
  qDebug("block added to group cache, total block num:%" PRId64, *pIdx + 1);
645,351✔
947
  
948
  return TSDB_CODE_SUCCESS;
645,351✔
949
}
950

951
static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
650,487✔
952
  int32_t code = TSDB_CODE_SUCCESS;
650,487✔
953
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
650,487✔
954
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
650,487✔
955
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
650,487✔
956
  int64_t newBlkIdx = 0;
650,487✔
957

958
  SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId));
650,487✔
959
  if (NULL == pGroup) {
650,487✔
960
    if (pGCache->batchFetch) {
473,795✔
961
      SGcOperatorParam fakeGcParam = {0};
473,795✔
962
      SOperatorParam fakeParam = {0};
473,795✔
963
      fakeGcParam.needCache = true;
473,795✔
964
      fakeParam.downstreamIdx = pSession->downstreamIdx;
473,795✔
965
      fakeParam.value = &fakeGcParam;
473,795✔
966
      fakeParam.reUse = false;
473,795✔
967
      code = addNewGroupData(pOperator, &fakeParam, &pGroup, GROUP_CACHE_DEFAULT_VGID, pBlock->info.id.groupId);
473,795✔
968
      if (TSDB_CODE_SUCCESS != code) {
473,795✔
969
        return code;
×
970
      }
971
    } else {
972
      qError("group %" PRIu64 " not found in group hash", pBlock->info.id.groupId);
×
973
      return TSDB_CODE_INVALID_PARA;
×
974
    }
975
  }
976

977
  if (!pGCache->batchFetch) {
650,487✔
978
    code = handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId);
5,136✔
979
    if (TSDB_CODE_SUCCESS != code) {
5,136✔
980
      return code;
×
981
    }
982
  }
983

984
  if (pGroup->needCache) {
650,487✔
985
    qDebug("add block to group cache");
645,351✔
986
    
987
    SGcBlkBufInfo newBlkBuf;    
645,351✔
988
    code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf);
645,351✔
989
    if (code) {
645,351✔
990
      return code;
×
991
    }
992

993
    code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &newBlkIdx);
645,351✔
994
    if (code) {
645,351✔
995
      return code;
×
996
    }
997
  } else {
998
    qDebug("no need to add block to group cache");
5,136✔
999
    
1000
    pGroup->pBlock = pBlock;
5,136✔
1001
  }
1002

1003
  QRY_ERR_RET(notifyWaitingSessions(pGroup->waitQueue));
650,487✔
1004
  if (pGroup == pSession->pGroupData) {
650,487✔
1005
    if (pGroup->needCache) {
176,692✔
1006
      pSession->lastBlkId = newBlkIdx;
171,556✔
1007
    }
1008
    
1009
    *continueFetch = false;
176,692✔
1010
  }
1011

1012
  return TSDB_CODE_SUCCESS;
650,487✔
1013
}
1014

1015
static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) {
156,808✔
1016
  int32_t code = TSDB_CODE_SUCCESS;
156,808✔
1017
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
156,808✔
1018
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];  
156,808✔
1019
  if (pGCache->batchFetch) {
156,808✔
1020
    SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
151,672✔
1021
    SGroupCacheData* pGroup = NULL;
151,672✔
1022
    while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) {
751,618✔
1023
      QRY_ERR_RET(handleGroupFetchDone(pGroup));
599,946✔
1024
    }
1025
    pCtx->fetchDone = true;
151,672✔
1026
  } else {
1027
    int32_t uidNum = 0;
5,136✔
1028
    SGcVgroupCtx* pVgCtx = NULL;
5,136✔
1029
    int32_t iter = 0;
5,136✔
1030
    while (NULL != (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter))) {
10,272✔
1031
      uidNum = taosArrayGetSize(pVgCtx->pTbList);
5,136✔
1032
      for (int32_t i = 0; i < uidNum; ++i) {
10,272✔
1033
        SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i);
5,136✔
1034
        QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup));
10,272✔
1035
      }
1036
      taosArrayClear(pVgCtx->pTbList);
5,136✔
1037
    }    
1038
  }
1039

1040
  taosHashClear(pCtx->pWaitSessions);
156,808✔
1041

1042
  return TSDB_CODE_SUCCESS;
156,808✔
1043
}
1044

1045
static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
333,500✔
1046
  bool continueFetch = true;
333,500✔
1047
  int32_t code = TSDB_CODE_SUCCESS;
333,500✔
1048
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
333,500✔
1049

1050
  while (continueFetch && TSDB_CODE_SUCCESS == code) {
983,987✔
1051
    QRY_ERR_RET(getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes));
1,614,590✔
1052
    
1053
    if (NULL == *ppRes) {
807,295✔
1054
      QRY_ERR_RET(handleDownstreamFetchDone(pOperator, pSession));
156,808✔
1055
      break;
156,808✔
1056
    } else {
1057
      QRY_ERR_RET(handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch));
650,487✔
1058
    }
1059
  }
1060

1061
  if (!continueFetch) {
333,500✔
1062
    SGcSessionCtx** ppWaitCtx = taosHashIterate(pCtx->pWaitSessions, NULL);
176,692✔
1063
    if (ppWaitCtx) {
176,692✔
1064
      taosHashCancelIterate(pCtx->pWaitSessions, ppWaitCtx);
×
1065
      int64_t* pSessionId = taosHashGetKey(ppWaitCtx, NULL);
×
1066
      if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, *pSessionId)) {
×
1067
        qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
×
1068
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1069
      }
1070
      SGcSessionCtx* pWaitCtx = *ppWaitCtx;
×
1071
      pWaitCtx->newFetch = true;
×
1072
      code = taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
×
1073
      if (code) {
×
1074
        qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", *pSessionId, tstrerror(code));
×
1075
        return code;
×
1076
      }
1077
      QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem));
×
1078

1079
      return code;
×
1080
    }
1081
  }
1082

1083
  if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, -1)) {
333,500✔
1084
    qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
×
1085
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1086
  }
1087
  
1088
  return code;
333,500✔
1089
}
1090

1091
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes, bool* got) {
12,903,946✔
1092
  int32_t code = TSDB_CODE_SUCCESS;
12,903,946✔
1093
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
12,903,946✔
1094
  *got = true;
12,903,946✔
1095

1096
  if (NULL != pSession->pGroupData) {
12,903,946✔
1097
    if (pSession->pGroupData->needCache) {
12,903,946✔
1098
      SGcBlkList* pBlkList = &pSession->pGroupData->blkList;
12,888,538✔
1099
      taosRLockLatch(&pBlkList->lock);
12,888,538✔
1100
      int64_t blkNum = taosArrayGetSize(pBlkList->pList);
12,888,538✔
1101
      if (pSession->lastBlkId < 0) {
12,888,538✔
1102
        if (blkNum > 0) {
8,470,328✔
1103
          SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0);
8,276,996✔
1104
          taosRUnLockLatch(&pBlkList->lock);
8,276,996✔
1105
          code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
8,276,996✔
1106
          pSession->lastBlkId = 0;
8,276,996✔
1107
          return code;
8,276,996✔
1108
        }
1109
      } else if ((pSession->lastBlkId + 1) < blkNum) {
4,418,210✔
1110
        SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1);
×
1111
        taosRUnLockLatch(&pBlkList->lock);
×
1112
        code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
×
1113
        pSession->lastBlkId++;
×
1114
        return code;
×
1115
      }
1116
      taosRUnLockLatch(&pBlkList->lock);
4,611,542✔
1117
    } else if (pSession->pGroupData->pBlock) {
15,408✔
1118
      *ppRes = pSession->pGroupData->pBlock;
5,136✔
1119
      pSession->pGroupData->pBlock = NULL;
5,136✔
1120
      return TSDB_CODE_SUCCESS;
5,136✔
1121
    }
1122

1123
    if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
4,621,814✔
1124
      *ppRes = NULL;
4,288,314✔
1125
      qDebug("sessionId: %" PRIu64 " fetch done", sessionId);
4,288,314✔
1126
      return code;
4,288,314✔
1127
    }
1128
  } else {
1129
    *ppRes = NULL;
×
1130
    qDebug("sessionId: %" PRIu64 " fetch done since downstream fetch done", sessionId);
×
1131
    return code;
×
1132
  }
1133

1134
  *got = false;
333,500✔
1135
  return code;
333,500✔
1136
}
1137

1138

1139
static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
×
1140
  // FOR NOW, IT'S ERROR TO REACH HERE
1141
#if 1
1142
  qError("should not enter session wait");
×
1143
  return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1144
#else
1145
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
1146
  SGroupCacheData* pGroup = pSession->pGroupData;
1147
  int32_t code = TSDB_CODE_SUCCESS;
1148
  bool inLock = true;
1149
  if (NULL == pGroup->waitQueue) {
1150
    pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES);
1151
    if (NULL == pGroup->waitQueue) {
1152
      QRY_ERR_JRET(terrno);
1153
    }
1154
  }
1155
  
1156
  if (NULL == taosArrayPush(pGroup->waitQueue, &pSession)) {
1157
    QRY_ERR_JRET(terrno);
1158
  }
1159

1160
  if (!pSession->semInit) {
1161
    QRY_ERR_JRET(tsem_init(&pSession->waitSem, 0, 0));
1162
    pSession->semInit = true;
1163
  }
1164

1165
  (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
1166
  inLock = false;
1167

1168
  QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES));
1169

1170
  code = tsem_wait(&pSession->waitSem);
1171
  if (code) {
1172
    qError("tsem_wait failed, error:%s", tstrerror(code));
1173
    QRY_ERR_JRET(code);
1174
  }
1175

1176
  if (pSession->newFetch) {
1177
    pSession->newFetch = false;
1178
    return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
1179
  }
1180

1181
  code = taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
1182
  if (code) {
1183
    qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", sessionId, tstrerror(code));
1184
    QRY_ERR_JRET(code);
1185
  }
1186

1187
  bool got = false;
1188
  return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
1189

1190
_return:
1191

1192
  if (inLock) {
1193
    (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
1194
  }
1195

1196
  return code;
1197
#endif
1198
}
1199

1200

1201
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
12,903,946✔
1202
  int32_t code = TSDB_CODE_SUCCESS;
12,903,946✔
1203
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
12,903,946✔
1204
  bool locked = false;
12,903,946✔
1205
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
12,903,946✔
1206
  
1207
  while (true) {
×
1208
    bool got = false;
12,903,946✔
1209
    code = getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
12,903,946✔
1210
    if (TSDB_CODE_SUCCESS != code || got) {
12,903,946✔
1211
      goto _return;
12,570,446✔
1212
    }
1213
    
1214
    if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId)
333,500✔
1215
      || (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) {
333,500✔
1216
      if (locked) {
333,500✔
1217
        (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
×
1218
        locked = false;
×
1219
      }
1220
      
1221
      code = getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
333,500✔
1222
      goto _return;
333,500✔
1223
    } else {
1224
      // FOR NOW, SHOULD NOT REACH HERE
1225
      qError("Invalid fetchSessionId:%" PRId64 ", currentSessionId:%" PRId64, pCtx->fetchSessionId, sessionId);
×
1226
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1227
    }
1228

1229
    if (locked) {
1230
      code = groupCacheSessionWait(pOperator, pCtx, sessionId, pSession, ppRes);
1231
      locked = false;
1232
      if (TSDB_CODE_SUCCESS != code) {
1233
        goto _return;
1234
      }
1235
      
1236
      break;
1237
    }
1238
    
1239
    (void)taosThreadMutexLock(&pSession->pGroupData->mutex);
1240
    locked = true;
1241
  };
1242

1243

1244
_return:
12,903,946✔
1245

1246
  if (locked) {
12,903,946✔
1247
    (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
×
1248
  }
1249

1250
  return code;
12,903,946✔
1251
}
1252

1253

1254
static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
1,355,564✔
1255
  SGcBlkCacheInfo* pCache = &pInfo->blkCache;
1,355,564✔
1256
  pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,355,564✔
1257
  if (NULL == pCache->pDirtyBlk) {
1,355,564✔
1258
    return terrno;
×
1259
  }
1260
  taosHashSetFreeFp(pCache->pDirtyBlk, freeGcBlkBufInfo);
1,355,564✔
1261
  pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,355,564✔
1262
  if (NULL == pCache->pReadBlk) {
1,355,564✔
1263
    return terrno;
×
1264
  }
1265
  pCache->writeDownstreamId = -1;
1,355,564✔
1266

1267
  return TSDB_CODE_SUCCESS;
1,355,564✔
1268
}
1269

1270
static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOperatorParam* pGcParam, SGroupCacheData* pGroup) {
1271
  pSession->pParam = pGcParam;
8,475,464✔
1272
  pSession->downstreamIdx = pGcParam->downstreamIdx;
8,475,464✔
1273
  pSession->pGroupData = pGroup;
8,475,464✔
1274
  pSession->lastBlkId = -1;
8,475,464✔
1275
}
8,475,464✔
1276

1277
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {
8,496,836✔
1278
  int32_t code = TSDB_CODE_SUCCESS;
8,496,836✔
1279
  SGcSessionCtx ctx = {0};
8,496,836✔
1280
  SGcOperatorParam* pGcParam = pParam->value;  
8,496,836✔
1281
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
8,496,836✔
1282
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
8,496,836✔
1283
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
8,496,836✔
1284

1285
  SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
8,496,836✔
1286
  if (NULL == pGroup && (NULL != pParam->pChildren || !pCtx->fetchDone)) {
8,496,836✔
1287
    code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? GROUP_CACHE_DEFAULT_VGID : pGcParam->vgId, pGcParam->tbUid);
198,468✔
1288
    if (TSDB_CODE_SUCCESS != code) {
198,468✔
1289
      return code;
×
1290
    }
1291
  }
1292

1293
  if (NULL == pGroup) {
8,496,836✔
1294
    return TSDB_CODE_SUCCESS;
21,372✔
1295
  }
1296

1297
  initGroupCacheSessionCtx(&ctx, pGcParam, pGroup);
8,475,464✔
1298

1299
  code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
8,475,464✔
1300
  if (TSDB_CODE_SUCCESS != code) {
8,475,464✔
1301
    return code;
×
1302
  }
1303

1304
  *ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
8,475,464✔
1305
  if (NULL == *ppSession) {
8,475,464✔
1306
    qError("fail to get session %" PRId64 " from pSessions", pGcParam->sessionId);
×
1307
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1308
  }
1309

1310
  qDebug("session:%" PRId64 " initialized, downstreamIdx:%d, vgId:%d, tbUid:%" PRId64 ", needCache:%d", 
8,475,464✔
1311
    pGcParam->sessionId, pGcParam->downstreamIdx, pGcParam->vgId, pGcParam->tbUid, pGcParam->needCache);
1312
  
1313
  return TSDB_CODE_SUCCESS;
8,475,464✔
1314
}
1315

1316
static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) {
12,925,318✔
1317
  int32_t code = TSDB_CODE_SUCCESS;
12,925,318✔
1318
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
12,925,318✔
1319
  SGcOperatorParam* pGcParam = pParam->value;
12,925,318✔
1320
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
12,925,318✔
1321
  SGcSessionCtx* pSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
12,925,318✔
1322
  if (NULL == pSession) {
12,925,318✔
1323
    int32_t code = initGroupCacheSession(pOperator, pParam, &pSession);
8,496,836✔
1324
    if (TSDB_CODE_SUCCESS != code) {
8,496,836✔
1325
      return code;
×
1326
    }
1327
    if (NULL == pSession) {
8,496,836✔
1328
      qDebug("session %" PRId64 " in downstream %d total got 0 rows since downtream fetch done", pGcParam->sessionId, pCtx->id);
21,372✔
1329
      return TSDB_CODE_SUCCESS;
21,372✔
1330
    }
1331
  } else if (pSession->pGroupData->needCache) {
4,428,482✔
1332
    SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
4,418,210✔
1333
    if (ppBlock) {
4,418,210✔
1334
      QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock));
4,289,935✔
1335
      code = taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
4,289,935✔
1336
      if (code) {
4,289,935✔
1337
        qError("taosHashRemove session %" PRId64 " from pReadBlk failed, error: %s", pGcParam->sessionId, tstrerror(code));
×
1338
        QRY_ERR_RET(code);
×
1339
      }
1340
    }
1341
  }
1342
  
1343
  QRY_ERR_RET(getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes));
12,903,946✔
1344
  if (NULL == *ppRes) {
12,903,946✔
1345
    qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
4,445,122✔
1346
    code = taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
4,445,122✔
1347
    if (code) {
4,445,122✔
1348
      qError("taosHashRemove session %" PRId64 " from pSessions failed, error: %s", pGcParam->sessionId, tstrerror(code));
×
1349
      QRY_ERR_RET(code);
×
1350
    }
1351
  } else {
1352
    pSession->resRows += (*ppRes)->info.rows;
8,458,824✔
1353
    qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows);
8,458,824✔
1354
  }
1355

1356
  return code;
12,903,946✔
1357
}
1358

1359
static int32_t initGroupCacheExecInfo(SOperatorInfo*        pOperator) {
1,355,564✔
1360
  SGroupCacheOperatorInfo* pInfo = pOperator->info;
1,355,564✔
1361
  pInfo->execInfo.pDownstreamBlkNum = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(int64_t));
1,355,564✔
1362
  if (NULL == pInfo->execInfo.pDownstreamBlkNum) {
1,355,564✔
1363
    return terrno;
×
1364
  }
1365
  return TSDB_CODE_SUCCESS;
1,355,564✔
1366
}
1367

1368
static void freeRemoveGroupCacheData(void* p) {
672,263✔
1369
  SGroupCacheData* pGroup = p;
672,263✔
1370
  if (pGroup->vgId > 0 && pGroup->needCache) {
672,263✔
1371
    SGcFileCacheCtx* pFileCtx = &pGroup->pVgCtx->fileCtx;
×
1372
    if (pGroup->fileId >= 0) {
×
1373
      SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
×
1374
      uint32_t remainNum = atomic_sub_fetch_32(&pFileInfo->groupNum, 1);
×
1375

1376
      qTrace("FileId:%d-%d-%d sub group num to %u", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId, remainNum);
×
1377

1378
      if (0 == remainNum && pGroup->fileId != pFileCtx->fileId) {
×
1379
        removeGroupCacheFile(pFileInfo);
×
1380

1381
#if 0
1382
        /* debug only */
1383
        snprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], sizeof(pFileCtx->baseFilename) - pFileCtx->baseNameLen, "_%d", pGroup->fileId);
1384
        taosRemoveFile(pFileCtx->baseFilename);
1385
        /* debug only */
1386
#endif
1387

1388
        qTrace("FileId:%d-%d-%d removed", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId);
×
1389
        //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
1390
      }
1391
    }
1392
  }
1393

1394
  taosArrayDestroy(pGroup->waitQueue);
672,263✔
1395
  taosArrayDestroy(pGroup->blkList.pList);
672,263✔
1396
  (void)taosThreadMutexDestroy(&pGroup->mutex);
672,263✔
1397

1398
  qTrace("group removed");
672,263✔
1399
}
672,263✔
1400

1401

1402

1403
static int32_t initGroupCacheDownstreamCtx(SOperatorInfo*          pOperator) {
1,355,564✔
1404
  SGroupCacheOperatorInfo* pInfo = pOperator->info;
1,355,564✔
1405
  pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams));
1,355,564✔
1406
  if (NULL == pInfo->pDownstreams) {
1,355,564✔
1407
    return terrno;
×
1408
  }
1409
  pInfo->downstreamNum = pOperator->numOfDownstream;
1,355,564✔
1410

1411
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
4,066,692✔
1412
    SGcDownstreamCtx* pCtx = &pInfo->pDownstreams[i];
2,711,128✔
1413
    pCtx->id = i;
2,711,128✔
1414
    pCtx->fetchSessionId = -1;
2,711,128✔
1415
    pCtx->lastBlkUid = 0;
2,711,128✔
1416
    pCtx->pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
2,711,128✔
1417
    if (NULL == pCtx->pVgTbHash) {
2,711,128✔
1418
      return terrno;
×
1419
    }
1420
    tSimpleHashSetFreeFp(pCtx->pVgTbHash, freeSGcVgroupCtx);      
2,711,128✔
1421

1422
    if (pInfo->batchFetch) {
2,711,128✔
1423
      int32_t defaultVg = 0;
2,708,560✔
1424
      SGcVgroupCtx vgCtx = {0};
2,708,560✔
1425
      initGcVgroupCtx(pOperator, &vgCtx, pCtx->id, defaultVg, NULL);      
2,708,560✔
1426
      QRY_ERR_RET(tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)));
2,708,560✔
1427
    }
1428
    
1429
    pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo));
2,711,128✔
1430
    if (NULL == pCtx->pNewGrpList) {
2,711,128✔
1431
      return terrno;
×
1432
    }
1433
    if (!pInfo->globalGrp) {
2,711,128✔
1434
      pCtx->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,711,128✔
1435
      if (pCtx->pGrpHash == NULL) {
2,711,128✔
1436
        return terrno;
×
1437
      }
1438
      taosHashSetFreeFp(pCtx->pGrpHash, freeRemoveGroupCacheData);      
2,711,128✔
1439
    }
1440

1441
    pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,711,128✔
1442
    if (pCtx->pSessions == NULL) {
2,711,128✔
1443
      return terrno;
×
1444
    }
1445
    taosHashSetFreeFp(pCtx->pSessions, freeSGcSessionCtx);
2,711,128✔
1446
  
1447
    pCtx->pFreeBlock = taosArrayInit(10, POINTER_BYTES);
2,711,128✔
1448
    if (NULL == pCtx->pFreeBlock) {
2,711,128✔
1449
      return terrno;
×
1450
    }
1451
    
1452
    pCtx->pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,711,128✔
1453
    if (pCtx->pWaitSessions == NULL) {
2,711,128✔
1454
      return terrno;
×
1455
    }
1456

1457
    (void)snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%p_%d", 
8,133,384✔
1458
      tsTempDir, taosGetPId(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pOperator, pCtx->id);
5,422,256✔
1459
    pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0;
2,711,128✔
1460
    pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename);
2,711,128✔
1461
  }
1462

1463
  return TSDB_CODE_SUCCESS;
1,355,564✔
1464
}
1465

1466
static int32_t groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
12,925,318✔
1467
  *pRes = NULL;
12,925,318✔
1468

1469
  SSDataBlock* pBlock = NULL;
12,925,318✔
1470
  int32_t      code = 0;
12,925,318✔
1471

1472
  code = getBlkFromGroupCache(pOperator, &pBlock, pParam);
12,925,318✔
1473
  if (TSDB_CODE_SUCCESS != code) {
12,925,318✔
1474
    pOperator->pTaskInfo->code = code;
×
1475
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1476
  }
1477

1478
  *pRes = pBlock;
12,925,318✔
1479
  return code;
12,925,318✔
1480
}
1481

1482
static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam* pParam) {
×
1483
  SGcNotifyOperatorParam* pGcParam = pParam->value;
×
1484
  SGroupCacheOperatorInfo* pGCache = pOperator->info;
×
1485
  SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGcParam->downstreamIdx];
×
1486
  SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
×
1487

1488
  qTrace("try to remove group %" PRIu64, pGcParam->tbUid);
×
1489
  if (taosHashRemove(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid))) {
×
1490
    qError("failed to remove group %" PRIu64 " in vgId %d downstreamIdx %d", pGcParam->tbUid, pGcParam->vgId, pGcParam->downstreamIdx);
×
1491
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1492
  }
1493

1494
  return TSDB_CODE_SUCCESS;
×
1495
}
1496

1497
static void resetGroupCacheBlockCache(SGcBlkCacheInfo* pCache) {
×
1498
  taosHashClear(pCache->pDirtyBlk);
×
1499

1500
  void* p = NULL;
×
1501
  while (NULL != (p = taosHashIterate(pCache->pReadBlk, p))) {
×
1502
    blockDataDeepCleanup(*(SSDataBlock**)p);
×
1503
    freeGcBlockInList(p);
×
1504
  }
1505

1506
  taosHashClear(pCache->pReadBlk);
×
1507

1508
  pCache->dirtyLock = 0;
×
1509
  pCache->pDirtyHead = NULL;
×
1510
  pCache->pDirtyTail = NULL;
×
1511
  pCache->blkCacheSize = 0;
×
1512
  pCache->writeDownstreamId = -1;
×
1513

1514
  return;
×
1515
}
1516

1517
static int32_t resetGroupCacheDownstreamCtx(SOperatorInfo* pOper) {
×
1518
  int32_t code = 0, lino = 0;
×
1519
  SGroupCacheOperatorInfo* pInfo = pOper->info;
×
1520
  if (NULL == pInfo->pDownstreams) {
×
1521
    return TSDB_CODE_SUCCESS;
×
1522
  }
1523
  
1524
  for (int32_t i = 0; i < pInfo->downstreamNum; ++i) {
×
1525
    SGcDownstreamCtx* pCtx = &pInfo->pDownstreams[i];
×
1526
    taosArrayClear(pCtx->pNewGrpList);
×
1527
    taosHashClear(pCtx->pGrpHash);
×
1528

1529
    tSimpleHashClear(pCtx->pVgTbHash);
×
1530
    if (pInfo->batchFetch) {
×
1531
      int32_t defaultVg = 0;
×
1532
      SGcVgroupCtx vgCtx = {0};
×
1533
      initGcVgroupCtx(pOper, &vgCtx, pCtx->id, defaultVg, NULL);      
×
1534
      TAOS_CHECK_EXIT(tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)));
×
1535
    }
1536
    
1537
    taosArrayClearEx(pCtx->pFreeBlock, freeGcBlockInList);
×
1538
    taosHashClear(pCtx->pSessions);
×
1539
    taosHashClear(pCtx->pWaitSessions);
×
1540
    freeSGcFileCacheCtx(&pCtx->fileCtx);
×
1541

1542
    pCtx->grpLock = 0;
×
1543
    pCtx->fetchSessionId = -1;
×
1544
    pCtx->blkLock = 0;
×
1545
    pCtx->lastBlkUid = 0;
×
1546
    pCtx->pBaseBlock = NULL;
×
1547
    pCtx->fetchDone = false;
×
1548
  }
1549

1550
_exit:
×
1551

1552
  if (code) {
×
1553
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1554
  }
1555

1556
  return code;
×
1557
}
1558

1559
static int32_t resetGroupCacheOperState(SOperatorInfo* pOper) {
×
1560
  int32_t code = 0, lino = 0;
×
1561
  SGroupCacheOperatorInfo* pInfo = pOper->info;
×
1562

1563
  pOper->status = OP_NOT_OPENED;
×
1564

1565
  resetGroupCacheBlockCache(&pInfo->blkCache);
×
1566

1567
  taosHashClear(pInfo->pGrpHash);
×
1568

1569
  TAOS_CHECK_EXIT(resetGroupCacheDownstreamCtx(pOper));
×
1570

1571
  memset(pInfo->execInfo.pDownstreamBlkNum, 0, pOper->numOfDownstream * sizeof(int64_t));
×
1572
  
1573
_exit:
×
1574

1575
  if (code) {
×
1576
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1577
  }
1578

1579
  return code;
×
1580
}
1581

1582
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
1,355,564✔
1583
                                     SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1584
                                     SOperatorInfo** pOptrInfo) {
1585
  QRY_PARAM_CHECK(pOptrInfo);
1,355,564✔
1586
  int32_t code = TSDB_CODE_SUCCESS;
1,355,564✔
1587

1588
  SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));
1,355,564✔
1589
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,355,564✔
1590
  if (pOperator == NULL || pInfo == NULL) {
1,355,564✔
1591
    code = terrno;
×
1592
    goto _error;
×
1593
  }
1594
  initOperatorCostInfo(pOperator);
1,355,564✔
1595

1596
  pOperator->transparent = true;
1,355,564✔
1597
  pOperator->pPhyNode = pPhyciNode;
1,355,564✔
1598
  
1599
  setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
1,355,564✔
1600

1601
  pInfo->maxCacheSize = 0;
1,355,564✔
1602
  pInfo->grpByUid = pPhyciNode->grpByUid;
1,355,564✔
1603
  pInfo->globalGrp = pPhyciNode->globalGrp;
1,355,564✔
1604
  pInfo->batchFetch = pPhyciNode->batchFetch;
1,355,564✔
1605
  
1606
  if (!pInfo->grpByUid) {
1,355,564✔
1607
    qError("only group cache by uid is supported now");
×
1608
    code = TSDB_CODE_INVALID_PARA;
×
1609
    goto _error;
×
1610
  }
1611
  
1612
  if (pPhyciNode->pGroupCols) {
1,355,564✔
1613
    code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols);
×
1614
    if (code) {
×
1615
      goto _error;
×
1616
    }
1617
  }
1618

1619
  code = initGroupCacheBlockCache(pInfo);
1,355,564✔
1620
  if (code) {
1,355,564✔
1621
    goto _error;
×
1622
  }
1623

1624
  if (pInfo->globalGrp) {
1,355,564✔
1625
    pInfo->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1626
    if (pInfo->pGrpHash == NULL) {
×
1627
      code = terrno;
×
1628
      goto _error;
×
1629
    }
1630
    taosHashSetFreeFp(pInfo->pGrpHash, freeRemoveGroupCacheData);
×
1631
  }
1632

1633
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
1,355,564✔
1634
  if (TSDB_CODE_SUCCESS != code) {
1,355,564✔
1635
    goto _error;
×
1636
  }
1637

1638
  code = initGroupCacheDownstreamCtx(pOperator);
1,355,564✔
1639
  if (TSDB_CODE_SUCCESS != code) {
1,355,564✔
1640
    goto _error;
×
1641
  }
1642

1643
  code = initGroupCacheExecInfo(pOperator);
1,355,564✔
1644
  if (TSDB_CODE_SUCCESS != code) {
1,355,564✔
1645
    goto _error;
×
1646
  }
1647

1648
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, groupCacheTableCacheEnd);
1,355,564✔
1649

1650
  setOperatorResetStateFn(pOperator, resetGroupCacheOperState);
1,355,564✔
1651

1652
  qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch);
1,355,564✔
1653

1654
  *pOptrInfo = pOperator;
1,355,564✔
1655
  return TSDB_CODE_SUCCESS;
1,355,564✔
1656

1657
_error:
×
1658
  if (pInfo != NULL) {
×
1659
    destroyGroupCacheOperator(pInfo);
×
1660
  }
1661

1662
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1663
  pTaskInfo->code = code;
×
1664
  return code;
×
1665
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc