• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5052

13 May 2026 12:00PM UTC coverage: 73.338% (-0.02%) from 73.358%
#5052

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

761 existing lines in 163 files now uncovered.

281469 of 383795 relevant lines covered (73.34%)

134502812.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.36
/source/libs/new-stream/src/dataSinkFile.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdint.h>
17
#include <stdio.h>
18
#include "dataSink.h"
19
#include "freeBlockMgr.h"
20
#include "osAtomic.h"
21
#include "osFile.h"
22
#include "osMemory.h"
23
#include "osTime.h"
24
#include "stream.h"
25
#include "taoserror.h"
26
#include "tarray.h"
27
#include "tdatablock.h"
28
#include "tdef.h"
29
#include "thash.h"
30
#include "tutil.h"
31

32
char      gDataSinkFilePath[PATH_MAX] = {0};
33
const int gFileGroupBlockMaxSize = 64 * 1024;  // 64K
34

35
int32_t initDataSinkFileDir() {
735,654✔
36
  int32_t code = 0;
735,654✔
37
  int     ret = snprintf(gDataSinkFilePath, sizeof(gDataSinkFilePath), "%s/tdengine_stream_data/", tsTempDir);
735,654✔
38
  if (ret < 0) {
735,654✔
39
    stError("failed to get stream data sink path ret:%d", ret);
×
40
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
41
  }
42

43
  if (!taosIsDir(gDataSinkFilePath)) {
735,654✔
44
    code = taosMulMkDir(gDataSinkFilePath);
371,635✔
45
  }
46
  if (code != 0) {
735,654✔
47
    return code;
×
48
  }
49
  stInfo("create stream data sink path %s", gDataSinkFilePath);
735,654✔
50
  return TSDB_CODE_SUCCESS;
735,654✔
51
}
52

53
static int32_t createStreamDataSinkFileMgr(int64_t streamId, SDataSinkFileMgr** ppDaSinkFileMgr) {
1,386✔
54
  SDataSinkFileMgr* pFileMgr = NULL;
1,386✔
55
  pFileMgr = (SDataSinkFileMgr*)taosMemoryCalloc(1, sizeof(SDataSinkFileMgr));
1,386✔
56
  if (pFileMgr == NULL) {
1,386✔
57
    return terrno;
×
58
  }
59
  int32_t now = taosGetTimestampSec();
1,386✔
60
  snprintf(pFileMgr->fileName, FILENAME_MAX, "%s//%s_%d_%" PRId64, gDataSinkFilePath, "stream", now, streamId);
1,386✔
61

62
  pFileMgr->fileBlockCount = 0;
1,386✔
63
  pFileMgr->fileBlockUsedCount = 0;
1,386✔
64
  pFileMgr->fileSize = 0;
1,386✔
65
  tRBTreeCreate(&pFileMgr->pFreeFileBlockList, compareFreeBlock);
1,386✔
66
  pFileMgr->writingGroupId = -1;
1,386✔
67
  pFileMgr->readingGroupId = -1;
1,386✔
68
  pFileMgr->writeFilePtr = NULL;
1,386✔
69
  pFileMgr->readFilePtr = NULL;
1,386✔
70

71
  *ppDaSinkFileMgr = pFileMgr;
1,386✔
72

73
  return TSDB_CODE_SUCCESS;
1,386✔
74
}
75

76
void destroyStreamDataSinkFile(SDataSinkFileMgr** ppDaSinkFileMgr) {
1,386✔
77
  if (ppDaSinkFileMgr == NULL || *ppDaSinkFileMgr == NULL) {
1,386✔
78
    return;
×
79
  }
80
  if ((*ppDaSinkFileMgr)) {
1,386✔
81
    if ((*ppDaSinkFileMgr)->writeFilePtr) {
1,386✔
82
      if(taosCloseFile(&(*ppDaSinkFileMgr)->writeFilePtr) != 0) {
1,386✔
83
        stError("failed to close file %s, lineno:%d", (*ppDaSinkFileMgr)->fileName, __LINE__);
×
84
      }
85
      (*ppDaSinkFileMgr)->writeFilePtr = NULL;
1,386✔
86
    }
87
    if ((*ppDaSinkFileMgr)->readFilePtr) {
1,386✔
88
      if(taosCloseFile(&(*ppDaSinkFileMgr)->readFilePtr) != 0) {
1,386✔
89
        stError("failed to close file %s, lineno:%d", (*ppDaSinkFileMgr)->fileName, __LINE__);
×
90
      }
91
      (*ppDaSinkFileMgr)->readFilePtr = NULL;
1,386✔
92
    }
93
    if (strlen((*ppDaSinkFileMgr)->fileName) > 0) {
1,386✔
94
      if(taosRemoveFile((*ppDaSinkFileMgr)->fileName) != 0) {
1,386✔
95
        stError("failed to remove file %s, lineno:%d", (*ppDaSinkFileMgr)->fileName, __LINE__);
×
96
      }
97
      (*ppDaSinkFileMgr)->fileName[0] = '\0';
1,386✔
98
    }
99

100
    clearAllFreeBlocks(&(*ppDaSinkFileMgr)->pFreeFileBlockList);
1,386✔
101
    taosMemoryFreeClear((*ppDaSinkFileMgr));
1,386✔
102
  }
103
}
104

105
static int32_t initStreamDataSinkFile(SSlidingTaskDSMgr* pStreamDataSink) {
1,386✔
106
  if (pStreamDataSink->pFileMgr == NULL) {
1,386✔
107
    return createStreamDataSinkFileMgr(pStreamDataSink->streamId, &pStreamDataSink->pFileMgr);
1,386✔
108
  }
109
  return TSDB_CODE_SUCCESS;
×
110
}
111

112
static int32_t openFileForWrite(SDataSinkFileMgr* pFileMgr) {
1,386✔
113
  void* existing = atomic_load_ptr(&pFileMgr->writeFilePtr);
1,386✔
114
  if (existing == NULL) {
1,386✔
115
    void* newPtr = taosOpenFile(pFileMgr->fileName, TD_FILE_CREATE | TD_FILE_WRITE);
1,386✔
116
    if (newPtr == NULL) {
1,386✔
117
      stError("open file %s failed, err: %s", pFileMgr->fileName, terrMsg);
×
118
      return terrno;
×
119
    }
120

121
    void* oldPtr = atomic_val_compare_exchange_ptr(&pFileMgr->writeFilePtr, NULL, newPtr);
1,386✔
122
    if (oldPtr != NULL) {
1,386✔
123
      TdFilePtr fileToClose = (TdFilePtr)newPtr;
×
124
      if(taosCloseFile(&fileToClose) != 0) {
×
125
        stError("failed to close file %s, lineno:%d", pFileMgr->fileName, __LINE__);
×
126
      }
127
    }
128
  }
129
  return TSDB_CODE_SUCCESS;
1,386✔
130
}
131

132
static int32_t openFileForRead(SDataSinkFileMgr* pFileMgr) {
1,386✔
133
  void* existing = atomic_load_ptr(&pFileMgr->readFilePtr);
1,386✔
134

135
  if (existing == NULL) {
1,386✔
136
    void* newPtr  = taosOpenFile(pFileMgr->fileName, TD_FILE_CREATE | TD_FILE_READ);
1,386✔
137
    if (newPtr == NULL) {
1,386✔
138
      stError("open file %s failed, err: %s", pFileMgr->fileName, terrMsg);
×
139
      return terrno;
×
140
    }
141

142
    void* oldPtr = atomic_val_compare_exchange_ptr(&pFileMgr->readFilePtr, NULL, newPtr);
1,386✔
143
    if (oldPtr != NULL) {
1,386✔
144
      TdFilePtr fileToClose = (TdFilePtr)newPtr;
×
145
      if(taosCloseFile(&fileToClose) != 0) {
×
146
        stError("failed to close file %s, lineno:%d", pFileMgr->fileName, __LINE__);
×
147
      }
148
    }
149
  }
150
  return TSDB_CODE_SUCCESS;
1,386✔
151
}
152

153
static void getFreeBlock(SDataSinkFileMgr* pFileMgr, int32_t needSize, SFileBlockInfo* pGroupBlockOffset) {
19,561,212✔
154
  FreeBlock* pFreeBlock = popBestFitBlock(&pFileMgr->pFreeFileBlockList, needSize);
19,561,212✔
155
  if (pFreeBlock != NULL) {
19,561,212✔
156
    pGroupBlockOffset->size = pFreeBlock->length;
×
157
    pGroupBlockOffset->offset = pFreeBlock->start;
×
158
    return;
×
159
  }
160
  pGroupBlockOffset->offset = pFileMgr->fileSize;
19,561,212✔
161
  pGroupBlockOffset->size = needSize;
19,561,212✔
162
  pFileMgr->fileBlockCount++;
19,561,212✔
163
  pFileMgr->fileBlockUsedCount++;
19,561,212✔
164
  pFileMgr->fileSize += needSize;
19,561,212✔
165
  destroyFreeBlock(pFreeBlock);
19,561,212✔
166
  return;
19,561,212✔
167
}
168

169
static int32_t addToFreeBlock(SDataSinkFileMgr* pFileMgr, const SFileBlockInfo* pBlockInfo) {
1,424,758✔
170
  if (pBlockInfo->size <= 0) return TSDB_CODE_SUCCESS;
1,424,758✔
171
  FreeBlock* pFreeBlock = createFreeBlock(pBlockInfo->offset, pBlockInfo->size);
1,424,758✔
172
  if (pFreeBlock == NULL) {
1,424,758✔
173
    stError("failed to create free block, err: %s", terrMsg);
×
174
    return terrno;
×
175
  }
176
  insertFreeBlock(&pFileMgr->pFreeFileBlockList, pFreeBlock);
1,424,758✔
177
  return TSDB_CODE_SUCCESS;
1,424,364✔
178
}
179

180
bool setNextIteratorFromFile(SResultIter** ppResult) {
19,967,920✔
181
  SResultIter* pResult = *ppResult;
19,967,920✔
182
  if (pResult->cleanMode == DATA_CLEAN_EXPIRED) {
19,967,920✔
183
    SSlidingGrpMgr* pSlidingGrpMgr = (SSlidingGrpMgr*)pResult->groupData;
19,967,920✔
184
    if (++pResult->offset < pSlidingGrpMgr->blocksInFile->size) {
19,967,920✔
185
      return false;
19,404✔
186
    } else {
187
      return true;
19,948,516✔
188
    }
189
  } else {
190
    // pointer movement is completed while reading data
191
    SAlignGrpMgr* pAlignGrpMgr = (SAlignGrpMgr*)pResult->groupData;
×
192
    // todo
193
    return pAlignGrpMgr->blocksInMem->size == 0;
×
194
  }
195
  return true;
196
}
197

198
static int32_t appendTmpSBlocksInMem(SResultIter* pResult, SSDataBlock* pBlock) {
20,904,246✔
199
  if (pBlock == NULL) {
20,904,246✔
200
    return TSDB_CODE_SUCCESS;
×
201
  }
202
  if (pResult->tmpBlocksInMem == NULL) {
20,904,246✔
203
    pResult->tmpBlocksInMem = taosArrayInit(1, sizeof(SSDataBlock*));
20,902,662✔
204
    if (pResult->tmpBlocksInMem == NULL) {
20,902,462✔
205
      return terrno;
×
206
    }
207
  }
208
  void* p = taosArrayPush(pResult->tmpBlocksInMem, &pBlock);
20,904,046✔
209
  if (p == NULL) {
20,904,244✔
210
    return terrno;
×
211
  }
212
  return TSDB_CODE_SUCCESS;
20,904,244✔
213
}
214

215
static int32_t readFileDataToSlidingWindows(SResultIter* pResult, SSlidingGrpMgr* pSlidingGrpMgr, int32_t tsColSlotId,
22,346,724✔
216
                                            SBlocksInfoFile* pBlockInfo, bool* finished) {
217
  int32_t code = TSDB_CODE_SUCCESS;
22,346,724✔
218
  int32_t lino = 0;
22,346,724✔
219

220
  char* buf = taosMemoryCalloc(1, pBlockInfo->dataLen);
22,346,724✔
221
  if (buf == NULL) {
22,346,822✔
222
    code = terrno;
×
223
    QUERY_CHECK_CODE(code, lino, _exit);
×
224
  }
225

226
  if (!pResult->pFileMgr->readFilePtr) {
22,346,822✔
227
    code = openFileForRead(pResult->pFileMgr);
1,386✔
228
    if (code != 0) {
1,386✔
229
      stError("failed to open file for read, err: %s", terrMsg);
×
230
      return code;
×
231
    }
232
  }
233

234
  int64_t readLen = taosPReadFile(pResult->pFileMgr->readFilePtr, buf, pBlockInfo->dataLen, pBlockInfo->groupOffset);
22,346,822✔
235
  if (readLen < 0 || readLen != pBlockInfo->dataLen) {
22,346,922✔
236
    code = terrno;
×
237
    QUERY_CHECK_CODE(code, lino, _exit);
×
238
  }
239

240
  *finished = false;
22,346,922✔
241
  char* start = buf;
22,346,922✔
242
  while (true) {
81,404,428✔
243
    SSlidingWindowInMem* pWindowData = (SSlidingWindowInMem*)start;
103,751,350✔
244

245
    if (pWindowData->startTime > pResult->reqEndTime) {
103,751,350✔
246
      *finished = true;
1,361,942✔
247
      break;
1,361,942✔
248
    } else if (pWindowData->endTime < pResult->reqStartTime) {
102,389,408✔
249
      // do nothing
250
    } else {
251
      *finished = false;
20,904,344✔
252
      SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
20,904,344✔
253
      if (pBlock == NULL) {
20,904,050✔
254
        return terrno;
×
255
      }
256
      QUERY_CHECK_CODE(code, lino, _exit);
20,904,050✔
257
      code = blockSpecialDecodeLaterPart(pBlock, getWindowDataBuf(pWindowData), tsColSlotId, pResult->reqStartTime,
20,904,050✔
258
                                         pResult->reqEndTime);
259
      QUERY_CHECK_CODE(code, lino, _exit);
20,904,246✔
260
      if (pBlock->info.rows == 0) {
20,904,246✔
261
        blockDataDestroy(pBlock);
×
262
      } else {
263
        code = appendTmpSBlocksInMem(pResult, pBlock);
20,904,246✔
264
        QUERY_CHECK_CODE(code, lino, _exit);
20,904,244✔
265
      }
266
    }
267
    start += sizeof(SSlidingWindowInMem) + pWindowData->dataLen;
102,389,308✔
268
    if (start >= buf + pBlockInfo->dataLen) {
102,389,308✔
269
      break;  // end of current data buffer
20,984,880✔
270
    }
271
  }
272
_exit:
22,346,822✔
273
  if (code != TSDB_CODE_SUCCESS) {
22,346,822✔
274
    stError("failed to read data from file, err: %s, lineno:%d", terrMsg, lino);
×
275
    if (buf) {
×
276
      taosMemoryFreeClear(buf);
×
277
    }
278
    if (pResult->tmpBlocksInMem != NULL) {
×
279
      taosArrayDestroy(pResult->tmpBlocksInMem);
×
280
      pResult->tmpBlocksInMem = NULL;
×
281
    }
282
    return code;
×
283
  }
284

285
  taosMemoryFree(buf);
22,346,822✔
286
  return TSDB_CODE_SUCCESS;
22,346,622✔
287
}
288

289
int32_t readSlidingDataFromFile(SResultIter* pResult, SSDataBlock** ppBlock, int32_t tsColSlotId) {
21,329,664✔
290
  int32_t           code = TSDB_CODE_SUCCESS;
21,329,664✔
291
  int32_t           lino = 0;
21,329,664✔
292
  SDataSinkFileMgr* pFileMgr = pResult->pFileMgr;
21,329,664✔
293

294
  SSlidingGrpMgr* pSlidingGrpMgr = (SSlidingGrpMgr*)pResult->groupData;
21,329,664✔
295

296
  while (pResult->offset < taosArrayGetSize(pSlidingGrpMgr->blocksInFile)) {
22,771,848✔
297
    SBlocksInfoFile* pBlockInfo = (SBlocksInfoFile*)taosArrayGet(pSlidingGrpMgr->blocksInFile, pResult->offset);
22,346,526✔
298
    if (pBlockInfo == NULL || pBlockInfo->dataLen <= 0) {
22,346,036✔
299
      stError("invalid block info at offset:%" PRId64 ", pBlockInfo:%p", pResult->offset, pBlockInfo);
×
300
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
301
    }
302

303
    bool finished = false;
22,346,526✔
304
    code = readFileDataToSlidingWindows(pResult, pSlidingGrpMgr, tsColSlotId, pBlockInfo, &finished);
22,346,526✔
305
    if (code != TSDB_CODE_SUCCESS) {
22,346,822✔
306
      stError("failed to read file data to sliding windows, err: %s, lineno:%d", terrMsg, lino);
×
307
      return code;
×
308
    }
309
    if ((pResult->tmpBlocksInMem == NULL || pResult->tmpBlocksInMem->size == 0) && !finished) {
22,346,822✔
310
      SFileBlockInfo fileBlockInfo = {.offset = pBlockInfo->groupOffset, .size = pBlockInfo->capacity};
1,424,658✔
311
      code = addToFreeBlock(pFileMgr, &fileBlockInfo);
1,424,658✔
312
      if (code != TSDB_CODE_SUCCESS) {
1,424,364✔
UNCOV
313
        stError("failed to add to free block, err: %s, lineno:%d", terrMsg, lino);
×
314
        return code;
×
315
      }
316
    }
317
    if (finished) {
22,346,528✔
318
      pResult->dataPos = DATA_SINK_ALL_TMP;
1,361,744✔
319
    }
320
    if (pResult->tmpBlocksInMem != NULL && pResult->tmpBlocksInMem->size > 0) {
22,346,528✔
321
      pResult->dataPos = (pResult->dataPos == DATA_SINK_ALL_TMP ? DATA_SINK_ALL_TMP : DATA_SINK_PART_TMP);
20,904,344✔
322
      pResult->winIndex = 0;
20,904,344✔
323
      *ppBlock = *(SSDataBlock**)taosArrayGet(pResult->tmpBlocksInMem, pResult->winIndex);
20,904,344✔
324
      break;
20,904,442✔
325
    }
326
    pResult->offset++;
1,442,184✔
327
  }
328

329
_exit:
425,322✔
330
  if (code != TSDB_CODE_SUCCESS) {
21,329,764✔
331
    stError("failed to read data from file, err: %s, lineno:%d", terrMsg, lino);
×
332
  }
333
  return code;
21,329,764✔
334
}
335

336
int32_t readAlignDataFromFile(SResultIter* pResult, SSDataBlock** ppBlock, int32_t tsColSlotId) {
×
337
  int32_t code = TSDB_CODE_SUCCESS;
×
338
  int32_t lino = 0;
×
339
  return TSDB_CODE_SUCCESS;
×
340
}
341

342
int32_t readDataFromFile(SResultIter* pResult, SSDataBlock** ppBlock, int32_t tsColSlotId) {
21,329,464✔
343
  if (pResult->cleanMode == DATA_CLEAN_EXPIRED) {
21,329,464✔
344
    return readSlidingDataFromFile(pResult, ppBlock, tsColSlotId);
21,329,366✔
345
  } else {
346
    return readAlignDataFromFile(pResult, ppBlock, tsColSlotId);
98✔
347
  }
348
}
349

350
int32_t moveSlidingGrpMemCache(SSlidingTaskDSMgr* pSlidingTaskMgr, SSlidingGrpMgr* pSlidingGrp) {
201,356,100✔
351
  if (pSlidingGrp->winDataInMem == NULL || pSlidingGrp->winDataInMem->size == 0) {
201,356,100✔
352
    return TSDB_CODE_SUCCESS;
181,794,888✔
353
  }
354
  int32_t    code = 0;
19,561,212✔
355
  int32_t    lino = 0;
19,561,212✔
356
  TaosIOVec* iov = NULL;
19,561,212✔
357

358
  if (!pSlidingTaskMgr->pFileMgr) {
19,561,212✔
359
    code = initStreamDataSinkFile(pSlidingTaskMgr);
1,386✔
360
    if (code != 0) {
1,386✔
361
      stError("failed to init stream data sink file, err: %s", terrMsg);
×
362
    }
363
    code = openFileForWrite(pSlidingTaskMgr->pFileMgr);
1,386✔
364
    if (code != 0) {
1,386✔
365
      destroyStreamDataSinkFile(&pSlidingTaskMgr->pFileMgr);
×
366
    }
367
  }
368
  SDataSinkFileMgr* pFileMgr = pSlidingTaskMgr->pFileMgr;
19,561,212✔
369

370
  int32_t nWin = taosArrayGetSize(pSlidingGrp->winDataInMem);
19,561,212✔
371
  iov = taosMemCalloc(nWin, sizeof(TaosIOVec));
19,561,212✔
372
  if (iov == NULL) {
19,561,212✔
373
    code = terrno;
×
374
    QUERY_CHECK_CODE(code, lino, _exit);
×
375
  }
376
  int32_t moveWinCount = 0;
19,561,212✔
377
  int32_t needSize = 0;
19,561,212✔
378
  for (int i = 0; i < nWin; ++i) {
40,485,554✔
379
    SSlidingWindowInMem* pSlidingWin = *(SSlidingWindowInMem**)taosArrayGet(pSlidingGrp->winDataInMem, i);
20,924,342✔
380
    if (pSlidingWin == NULL || pSlidingWin->dataLen < 0) {
20,924,342✔
381
      stError("sliding window in mem is NULL or dataLen < 0, i:%d, pSlidingWin:%p", i, pSlidingWin);
×
382
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
383
      QUERY_CHECK_CODE(code, lino, _exit);
×
384
    }
385
    if (pSlidingWin->dataLen == 0) {
20,924,342✔
386
      // todo
387
    }
388
    if (needSize + pSlidingWin->dataLen + sizeof(SSlidingWindowInMem) > DS_FILE_BLOCK_SIZE) {
20,924,342✔
389
      break;
×
390
    }
391
    ++moveWinCount;
20,924,342✔
392
    iov[i].iov_base = pSlidingWin;
20,924,342✔
393
    iov[i].iov_len = pSlidingWin->dataLen + sizeof(SSlidingWindowInMem);
20,924,342✔
394
    needSize += pSlidingWin->dataLen + sizeof(SSlidingWindowInMem);
20,924,342✔
395
  }
396

397
  if (pSlidingGrp->blocksInFile == NULL) {
19,561,212✔
398
    pSlidingGrp->blocksInFile = taosArrayInit(0, sizeof(SBlocksInfoFile));
19,540,620✔
399
    if (pSlidingGrp->blocksInFile == NULL) {
19,540,620✔
400
      code = terrno;
×
401
      QUERY_CHECK_CODE(code, lino, _exit);
×
402
    }
403
  }
404
  SBlocksInfoFile fileBlockInfo = {0};
19,561,212✔
405
  SFileBlockInfo  groupBlockOffset = {0};
19,561,212✔
406
  getFreeBlock(pFileMgr, needSize, &groupBlockOffset);
19,561,212✔
407
  int64_t groupOffset;  // offset in file
408
  int64_t dataStartOffset;
409
  int64_t dataLen;
410
  int64_t capacity;  // size in file
411
  fileBlockInfo.groupOffset = groupBlockOffset.offset;
19,561,212✔
412
  fileBlockInfo.capacity = groupBlockOffset.size;
19,561,212✔
413
  fileBlockInfo.dataLen = needSize;
19,561,212✔
414
  stDebug("move sliding group memory cache, groupId:%" PRId64
19,561,212✔
415
          ", moveWinCount:%d, needSize:%d, "
416
          "groupOffset:%" PRId64 ", capacity:%" PRId64 ", dataLen:%" PRId64,
417
          pSlidingGrp->groupId, moveWinCount, needSize, fileBlockInfo.groupOffset, fileBlockInfo.capacity,
418
          fileBlockInfo.dataLen);
419

420
  if (false) {  // append path may skip taosLSeekFile (todo)
421

422
  } else {  // first write
423
    int64_t ret = taosLSeekFile(pFileMgr->writeFilePtr, fileBlockInfo.groupOffset, SEEK_SET);
19,561,212✔
424
    if (ret < 0) {
19,561,212✔
425
      code = terrno;
×
426
      QUERY_CHECK_CODE(code, lino, _exit);
×
427
    }
428

429
    int64_t writeLen = taosWritevFile(pFileMgr->writeFilePtr, iov, moveWinCount);
19,561,212✔
430
    if (writeLen != needSize) {
19,561,212✔
431
      code = terrno;
×
432
      QUERY_CHECK_CODE(code, lino, _exit);
×
433
    }
434
    QUERY_CHECK_CODE(code, lino, _exit);
19,561,212✔
435
  }
436

437
  taosArrayRemoveBatch(pSlidingGrp->winDataInMem, 0, moveWinCount, destroySlidingWindowInMemPP);
19,561,212✔
438

439
  void* pBlocksInFile = taosArrayPush(pSlidingGrp->blocksInFile, &fileBlockInfo);
19,561,212✔
440
  if (pBlocksInFile == NULL) {
19,561,212✔
441
    code = terrno;
×
442
    QUERY_CHECK_CODE(code, lino, _exit);
×
443
  }
444

445
_exit:
19,561,212✔
446
  if (code != TSDB_CODE_SUCCESS) {
19,561,212✔
447
    stError("failed to move sliding group memory cache, code: %d, lineno:%d", code, lino);
×
448
    (void)addToFreeBlock(pFileMgr, &groupBlockOffset);
×
449
  }
450
  taosMemoryFree(iov);
19,561,212✔
451
  return code;
19,561,212✔
452
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc