• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4875

09 Dec 2025 01:22AM UTC coverage: 64.472% (-0.2%) from 64.623%
#4875

push

travis-ci

guanshengliang
fix: temporarily disable memory leak detection for UDF tests (#33856)

162014 of 251293 relevant lines covered (64.47%)

104318075.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.47
/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "tss.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20
#include "vnd.h"
21

22
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
242,965,690✔
23
  int32_t     code = 0;
242,965,690✔
24
  int32_t     lino;
25
  const char *path = pFD->path;
242,965,690✔
26
  int32_t     szPage = pFD->szPage;
242,994,690✔
27
  int32_t     flag = pFD->flag;
242,996,995✔
28
  int64_t     lc_size = 0;
242,969,390✔
29

30
  pFD->pFD = taosOpenFile(path, flag);
242,992,126✔
31
  if (pFD->pFD == NULL) {
242,978,110✔
32
    if (TD_FILE_READ == flag) {
×
33
      int32_t expired = grantCheck(TSDB_GRANT_SHARED_STORAGE);
×
34
      if (expired && tsSsEnabled) {
×
35
        tsdbWarn("s3 grant expired: %d", expired);
×
36
        tsSsEnabled = false;
×
37
      } else if (!expired && tsSsEnabled) {
×
38
        tsSsEnabled = true;
×
39
      }
40
    }
41

42
    if (tsSsEnabled && pFD->lcn > 1 && !strncmp(path + strlen(path) - 5, ".data", 5)) {
×
43
      char lc_path[TSDB_FILENAME_LEN];
×
44
      tstrncpy(lc_path, path, TSDB_FQDN_LEN);
×
45

46
      int32_t     vid = 0;
×
47
      const char *object_name = taosDirEntryBaseName((char *)path);
×
48
      (void)sscanf(object_name, "v%df%dver%" PRId64, &vid, &pFD->fid, &pFD->cid);
×
49

50
      char *dot = strrchr(lc_path, '.');
×
51
      if (!dot) {
×
52
        tsdbError("unexpected path: %s", lc_path);
×
53
        TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
×
54
      }
55
      snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", pFD->lcn);
×
56

57
      pFD->pFD = taosOpenFile(lc_path, flag);
×
58
      if (pFD->pFD == NULL) {
×
59
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
60
      }
61
      if (taosStatFile(lc_path, &lc_size, NULL, NULL) < 0) {
×
62
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
63
      }
64
    } else {
65
      tsdbInfo("no file: %s", path);
×
66
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
67
    }
68
    pFD->ssFile = 1;
×
69
  }
70

71
  pFD->pBuf = taosMemoryCalloc(1, szPage);
243,007,676✔
72
  if (pFD->pBuf == NULL) {
242,978,253✔
73
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
74
  }
75

76
  if (lc_size > 0) {
242,983,256✔
77
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
78
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
79

80
    pFD->szFile = lc_size + chunksize * (pFD->lcn - 1);
×
81
  }
82

83
  // not check file size when reading data files.
84
  if (flag != TD_FILE_READ /* && !pFD->s3File*/) {
242,983,256✔
85
    if (!lc_size && taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
18,232,335✔
86
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
87
    }
88
  }
89

90
  if (pFD->szFile % szPage != 0) {
242,972,186✔
91
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
92
  }
93
  pFD->szFile = pFD->szFile / szPage;
242,993,708✔
94

95
_exit:
243,048,982✔
96
  if (code) {
243,037,994✔
97
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
98
  }
99
  return code;
243,037,994✔
100
}
101

102
// =============== PAGE-WISE FILE ===============
103
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn) {
250,932,423✔
104
  int32_t  code = 0;
250,932,423✔
105
  int32_t  lino;
106
  STsdbFD *pFD = NULL;
250,932,423✔
107
  int32_t  szPage = pTsdb->pVnode->config.tsdbPageSize;
250,932,423✔
108

109
  *ppFD = NULL;
250,986,271✔
110

111
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
250,979,213✔
112
  if (pFD == NULL) {
250,807,304✔
113
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
114
  }
115

116
  pFD->path = (char *)&pFD[1];
250,807,304✔
117
  memcpy(pFD->path, path, strlen(path) + 1);
250,820,446✔
118
  pFD->szPage = szPage;
250,973,191✔
119
  pFD->flag = flag;
250,966,718✔
120
  pFD->szPage = szPage;
250,972,766✔
121
  pFD->pgno = 0;
250,978,342✔
122
  pFD->lcn = lcn;
250,951,568✔
123
  pFD->pTsdb = pTsdb;
250,933,199✔
124

125
  *ppFD = pFD;
250,966,997✔
126

127
_exit:
250,954,489✔
128
  if (code) {
250,954,489✔
129
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
130
  }
131
  return code;
250,894,169✔
132
}
133

134
void tsdbCloseFile(STsdbFD **ppFD) {
250,955,510✔
135
  STsdbFD *pFD = *ppFD;
250,955,510✔
136
  if (pFD) {
250,970,512✔
137
    taosMemoryFree(pFD->pBuf);
250,984,963✔
138
    int32_t code = taosCloseFile(&pFD->pFD);
250,966,037✔
139
    if (code) {
250,893,655✔
140
      tsdbError("failed to close file: %s, code:%d reason:%s", pFD->path, code, tstrerror(code));
×
141
    } else {
142
      tsdbTrace("close file: %s", pFD->path);
250,893,655✔
143
    }
144
    taosMemoryFree(pFD);
250,866,966✔
145
    *ppFD = NULL;
250,950,315✔
146
  }
147
}
250,939,795✔
148

149
static int32_t tsdbWriteFilePage(STsdbFD *pFD, SEncryptData *encryptData) {
604,583,971✔
150
  int32_t code = 0;
604,583,971✔
151
  int32_t lino;
152

153
  if (!pFD->pFD) {
604,583,971✔
154
    code = tsdbOpenFileImpl(pFD);
18,232,295✔
155
    TSDB_CHECK_CODE(code, lino, _exit);
18,223,950✔
156
  }
157

158
  if (pFD->pgno > 0) {
604,584,187✔
159
    int64_t offset = PAGE_OFFSET(pFD->pgno, pFD->szPage);
586,369,750✔
160
    if (pFD->ssFile && pFD->lcn > 1) {
586,363,604✔
161
      SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
162
      int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
163
      int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
164

165
      offset -= chunkoffset;
×
166
    }
167

168
    int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
586,364,975✔
169
    if (n < 0) {
586,367,920✔
170
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
171
    }
172

173
    code = taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
586,367,920✔
174
    TSDB_CHECK_CODE(code, lino, _exit);
586,376,783✔
175

176
    if (encryptData != NULL && encryptData->encryptAlgrName[0] != '\0') {
586,376,783✔
177
      // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
178
      unsigned char PacketData[128];
1,530✔
179
      int           newLen = 0;
1,530✔
180
      int32_t       count = 0;
1,530✔
181
      while (count < pFD->szPage) {
50,490✔
182
        SCryptOpts opts = {0};
48,960✔
183
        opts.len = 128;
48,960✔
184
        opts.source = pFD->pBuf + count;
48,960✔
185
        opts.result = PacketData;
48,960✔
186
        opts.unitLen = 128;
48,960✔
187
        opts.pOsslAlgrName = encryptData->encryptAlgrName;
48,960✔
188
        tstrncpy(opts.key, encryptData->encryptKey, ENCRYPT_KEY_LEN + 1);
48,960✔
189

190
        newLen = CBC_Encrypt(&opts);
48,960✔
191
        if (newLen != opts.len) TSDB_CHECK_CODE(code = newLen, lino, _exit);
48,960✔
192

193
        memcpy(pFD->pBuf + count, PacketData, newLen);
48,960✔
194
        count += newLen;
48,960✔
195
      }
196
      // tsdbDebug("CBC Encrypt count:%d %s", count, __FUNCTION__);
197
    }
198

199
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
586,372,394✔
200
    if (n < 0) {
586,365,251✔
201
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
202
    }
203

204
    if (pFD->szFile < pFD->pgno) {
586,365,251✔
205
      pFD->szFile = pFD->pgno;
585,903,457✔
206
    }
207
  }
208
  pFD->pgno = 0;
604,598,787✔
209

210
_exit:
604,607,562✔
211
  if (code) {
604,607,562✔
212
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
213
  }
214
  return code;
604,588,315✔
215
}
216

217
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, SEncryptData *encryptData) {
521,973,749✔
218
  int32_t code = 0;
521,973,749✔
219
  int32_t lino;
220

221
  if (!pFD->pFD) {
521,973,749✔
222
    code = tsdbOpenFileImpl(pFD);
×
223
    TSDB_CHECK_CODE(code, lino, _exit);
×
224
  }
225

226
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
522,046,016✔
227
  if (pFD->lcn > 1) {
522,016,394✔
228
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
229
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
230
    int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
231

232
    offset -= chunkoffset;
×
233
  }
234

235
  // seek
236
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
522,025,950✔
237
  if (n < 0) {
522,049,897✔
238
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
239
  }
240

241
  // read
242
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
522,049,897✔
243
  if (n < 0) {
521,971,833✔
244
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
245
  } else if (n < pFD->szPage) {
521,971,833✔
246
    tsdbError(
×
247
        "vgId:%d %s failed at %s:%d since read file size is less than page size, "
248
        "read size: %" PRId64 ", page size: %d, fname:%s, pgno:%" PRId64,
249
        TD_VID(pFD->pTsdb->pVnode), __func__, __FILE__, __LINE__, n, pFD->szPage, pFD->path, pFD->pgno);
250
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
251
  }
252
  //}
253

254
  if (encryptData != NULL && encryptData->encryptAlgrName[0] != '\0') {
522,060,911✔
255
    // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
256
    unsigned char PacketData[128];
×
257
    int           newLen = 0;
×
258

259
    int32_t count = 0;
×
260
    while (count < pFD->szPage) {
×
261
      SCryptOpts opts = {0};
×
262
      opts.len = 128;
×
263
      opts.source = pFD->pBuf + count;
×
264
      opts.result = PacketData;
×
265
      opts.unitLen = 128;
×
266
      opts.pOsslAlgrName = encryptData->encryptAlgrName;
×
267
      tstrncpy(opts.key, encryptData->encryptKey, ENCRYPT_KEY_LEN + 1);
×
268

269
      newLen = CBC_Decrypt(&opts);
×
270
      if (newLen != opts.len) TSDB_CHECK_CODE(code = newLen, lino, _exit);
×
271

272
      memcpy(pFD->pBuf + count, PacketData, newLen);
×
273
      count += newLen;
×
274
    }
275
    // tsdbDebug("CBC Decrypt count:%d %s", count, __FUNCTION__);
276
  }
277

278
  // check
279
  if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
824,297,468✔
280
    tsdbError("vgId:%d %s failed at %s:%d since checksum mismatch, fname:%s, pgno:%" PRId64, TD_VID(pFD->pTsdb->pVnode),
×
281
              __func__, __FILE__, __LINE__, pFD->path, pgno);
282
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
283
  }
284

285
  pFD->pgno = pgno;
521,961,407✔
286

287
_exit:
522,218,593✔
288
  if (code) {
522,218,593✔
289
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
290
  }
291
  return code;
522,020,218✔
292
}
293

294
int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, SEncryptData *encryptData) {
463,075,148✔
295
  int32_t code = 0;
463,075,148✔
296
  int32_t lino;
297
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
463,075,148✔
298
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
463,156,500✔
299
  int64_t bOffset = fOffset % pFD->szPage;
463,151,828✔
300
  int64_t n = 0;
463,156,896✔
301

302
  do {
303
    if (pFD->pgno != pgno) {
1,031,299,117✔
304
      code = tsdbWriteFilePage(pFD, encryptData);
586,354,493✔
305
      TSDB_CHECK_CODE(code, lino, _exit);
586,343,288✔
306

307
      if (pgno <= pFD->szFile) {
586,343,288✔
308
        code = tsdbReadFilePage(pFD, pgno, encryptData);
466,806✔
309
        TSDB_CHECK_CODE(code, lino, _exit);
466,806✔
310
      } else {
311
        pFD->pgno = pgno;
585,880,419✔
312
      }
313
    }
314

315
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
1,031,220,588✔
316
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
1,031,226,330✔
317

318
    pgno++;
1,031,235,553✔
319
    bOffset = 0;
1,031,235,553✔
320
    n += nWrite;
1,031,235,553✔
321
  } while (n < size);
1,031,235,553✔
322

323
_exit:
463,093,332✔
324
  if (code) {
463,093,332✔
325
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
326
  }
327
  return code;
463,098,216✔
328
}
329

330
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, SEncryptData *encryptData) {
1,566,870,183✔
331
  int32_t code = 0;
1,566,870,183✔
332
  int32_t lino;
333
  int64_t n = 0;
1,566,870,183✔
334
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
1,566,870,183✔
335
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
1,567,011,311✔
336
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
1,567,118,597✔
337
  int64_t bOffset = fOffset % pFD->szPage;
1,567,123,835✔
338

339
  if (bOffset >= szPgCont) {
1,567,158,935✔
340
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
341
  }
342

343
  while (n < size) {
2,147,483,647✔
344
    if (pFD->pgno != pgno) {
1,776,558,979✔
345
      code = tsdbReadFilePage(pFD, pgno, encryptData);
521,575,103✔
346
      TSDB_CHECK_CODE(code, lino, _exit);
521,535,876✔
347
    }
348

349
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
1,776,612,313✔
350
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
1,776,612,313✔
351

352
    n += nRead;
1,776,495,868✔
353
    pgno++;
1,776,495,868✔
354
    bOffset = 0;
1,776,495,868✔
355
  }
356

357
_exit:
1,567,095,824✔
358
  if (code) {
1,567,095,824✔
359
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
360
  }
361
  return code;
1,566,911,260✔
362
}
363

364
#ifdef USE_SHARED_STORAGE
365
static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, uint8_t **ppBlock) {
×
366
  int32_t code = 0, lino, vid = TD_VID(pFD->pTsdb->pVnode);
×
367

368
  char* buf = taosMemoryCalloc(1, size);
×
369
  if (buf == NULL) {
×
370
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
371
  }
372

373
  SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
374
  int64_t szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
375

376
  for (int64_t n = 0, nRead = 0; n < size; n += nRead, offset += nRead) {
×
377
    int chunk = offset / szChunk + 1;
×
378

379
    if (chunk >= pFD->lcn) {
×
380
      if (taosLSeekFile(pFD->pFD, offset - szChunk * (pFD->lcn - 1), SEEK_SET) < 0) {
×
381
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
382
      }
383

384
      int64_t toRead = size - n;
×
385
      nRead = taosReadFile(pFD->pFD, buf + n, toRead);
×
386
      if (nRead < 0) {
×
387
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
388
      } else if (nRead < toRead) {
×
389
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
390
      }
391
    } else if (tsSsEnabled) {
×
392
      int64_t toRead = TMIN(szChunk - offset % szChunk, size - n);
×
393
      nRead = toRead;
×
394

395
      char path[TSDB_FILENAME_LEN];
×
396
      sprintf(path, "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, pFD->fid, vid, pFD->fid, pFD->cid, chunk);
×
397

398
      code = tssReadFileFromDefault(path, offset % szChunk, buf + n, &nRead);
×
399
      TSDB_CHECK_CODE(code, lino, _exit);
×
400
      if (nRead < toRead) {
×
401
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
402
      }
403
    } else {
404
      TSDB_CHECK_CODE(code = TSDB_CODE_OPS_NOT_SUPPORT, lino, _exit);
×
405
    }
406
  }
407

408
_exit:
×
409
  if (code) {
×
410
    TSDB_ERROR_LOG(vid, lino, code);
×
411
    taosMemoryFree(buf);
×
412
  } else {
413
    *ppBlock = (uint8_t *)buf;
×
414
  }
415
  return code;
×
416
}
417
#endif
418

419

420

421
static int32_t tsdbReadFileSs(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
×
422
#ifdef USE_SHARED_STORAGE
423
  int32_t code = 0;
×
424
  int32_t lino;
425
  int64_t n = 0;
×
426
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
×
427
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
×
428
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
×
429
  int64_t bOffset = fOffset % pFD->szPage;
×
430

431
  if (bOffset >= szPgCont) {
×
432
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
433
  }
434

435
  // 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
436
  // 2, fetch pgnoStart ~ pgnoEnd from s3
437
  // 3, store pgs to pcache & last pg to pFD->pBuf
438
  // 4, deliver pgs to [pBuf, pBuf + size)
439

440
  while (n < size) {
×
441
    if (pFD->pgno != pgno) {
×
442
      LRUHandle *handle = NULL;
×
443
      code = tsdbCacheGetPageSs(pFD->pTsdb->pgCache, pFD, pgno, &handle);
×
444
      if (code != TSDB_CODE_SUCCESS) {
×
445
        if (handle) {
×
446
          tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
447
        }
448
        TSDB_CHECK_CODE(code, lino, _exit);
×
449
      }
450

451
      if (!handle) {
×
452
        break;
×
453
      }
454

455
      uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
×
456
      memcpy(pFD->pBuf, pPage, pFD->szPage);
×
457
      tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
458

459
      // check
460
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
461
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
462
      }
463

464
      pFD->pgno = pgno;
×
465
    }
466

467
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
468
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
469

470
    n += nRead;
×
471
    ++pgno;
×
472
    bOffset = 0;
×
473
  }
474

475
  if (n < size) {
×
476
    // 2, retrieve pgs from s3
477
    uint8_t *pBlock = NULL;
×
478
    int64_t  retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
×
479
    int64_t  pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont;
×
480

481
    if (szHint > 0) {
×
482
      pgnoEnd = pgno - 1 + (bOffset + szHint - n + szPgCont - 1) / szPgCont;
×
483
    }
484

485
    int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
×
486

487
    code = tsdbReadFileBlock(pFD, retrieve_offset, retrieve_size, &pBlock);
×
488
    TSDB_CHECK_CODE(code, lino, _exit);
×
489
    // 3, Store Pages in Cache
490
    int nPage = pgnoEnd - pgno + 1;
×
491
    for (int i = 0; i < nPage; ++i) {
×
492
      if (pFD->szFile != pgno) {  // DONOT cache last volatile page
×
493
        tsdbCacheSetPageSs(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
×
494
      }
495

496
      if (szHint > 0 && n >= size) {
×
497
        ++pgno;
×
498
        continue;
×
499
      }
500
      memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
×
501

502
      // check
503
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
504
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
505
      }
506

507
      pFD->pgno = pgno;
×
508

509
      int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
510
      memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
511

512
      n += nRead;
×
513
      ++pgno;
×
514
      bOffset = 0;
×
515
    }
516

517
    taosMemoryFree(pBlock);
×
518
  }
519

520
_exit:
×
521
  if (code) {
×
522
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
523
  }
524
  return code;
×
525
#else
526
  return TSDB_CODE_INTERNAL_ERROR;
527
#endif
528
}
529

530
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
1,566,783,492✔
531
                     SEncryptData *encryptData) {
532
  int32_t code = 0;
1,566,783,492✔
533
  int32_t lino;
534

535
  if (!pFD->pFD) {
1,566,783,492✔
536
    code = tsdbOpenFileImpl(pFD);
224,793,701✔
537
    TSDB_CHECK_CODE(code, lino, _exit);
224,754,085✔
538
  }
539

540
  if (pFD->ssFile && pFD->lcn > 1 /* && tsSsBlockSize < 0*/) {
1,566,936,506✔
541
    code = tsdbReadFileSs(pFD, offset, pBuf, size, szHint);
×
542
    TSDB_CHECK_CODE(code, lino, _exit);
×
543
  } else {
544
    code = tsdbReadFileImp(pFD, offset, pBuf, size, encryptData);
1,566,976,178✔
545
    TSDB_CHECK_CODE(code, lino, _exit);
1,566,919,253✔
546
  }
547

548
_exit:
1,566,919,253✔
549
  if (code) {
1,566,919,253✔
550
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
551
  }
552
  return code;
1,566,902,604✔
553
}
554

555
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
924,400,891✔
556
                             SEncryptData *encryptData) {
557
  int32_t code;
558
  int32_t lino;
559

560
  code = tBufferEnsureCapacity(buffer, buffer->size + size);
924,400,891✔
561
  TSDB_CHECK_CODE(code, lino, _exit);
924,418,660✔
562

563
  code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint, encryptData);
924,418,660✔
564
  TSDB_CHECK_CODE(code, lino, _exit);
924,726,796✔
565

566
  buffer->size += size;
924,726,796✔
567

568
_exit:
924,770,883✔
569
  if (code) {
924,770,883✔
570
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
571
  }
572
  return code;
924,689,628✔
573
}
574

575
int32_t tsdbFsyncFile(STsdbFD *pFD, SEncryptData *encryptData) {
18,233,991✔
576
  int32_t code = 0;
18,233,991✔
577
  int32_t lino;
578

579
  code = tsdbWriteFilePage(pFD, encryptData);
18,233,991✔
580
  TSDB_CHECK_CODE(code, lino, _exit);
18,235,689✔
581

582
  if (taosFsyncFile(pFD->pFD) < 0) {
18,235,689✔
583
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ERRNO), lino, _exit);
×
584
  }
585

586
_exit:
18,237,040✔
587
  if (code) {
18,237,040✔
588
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
589
  }
590
  return code;
18,236,240✔
591
}
592

593
// SDataFReader ====================================================
594
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
×
595
  int32_t       code = 0;
×
596
  int32_t       lino = 0;
×
597
  SDataFReader *pReader = NULL;
×
598
  int32_t       szPage = pTsdb->pVnode->config.tsdbPageSize;
×
599
  char          fname[TSDB_FILENAME_LEN];
×
600

601
  // alloc
602
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
×
603
  if (pReader == NULL) {
×
604
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
605
  }
606
  pReader->pTsdb = pTsdb;
×
607
  pReader->pSet = pSet;
×
608

609
  // head
610
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
×
611
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD, 0);
×
612
  TSDB_CHECK_CODE(code, lino, _exit);
×
613

614
  // data
615
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
×
616
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD, 0);
×
617
  TSDB_CHECK_CODE(code, lino, _exit);
×
618

619
  // sma
620
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
×
621
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD, 0);
×
622
  TSDB_CHECK_CODE(code, lino, _exit);
×
623

624
  // stt
625
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
×
626
    tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
×
627
    code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt], 0);
×
628
    TSDB_CHECK_CODE(code, lino, _exit);
×
629
  }
630

631
_exit:
×
632
  if (code) {
×
633
    *ppReader = NULL;
×
634
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
635

636
    if (pReader) {
×
637
      for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
×
638
      tsdbCloseFile(&pReader->pSmaFD);
×
639
      tsdbCloseFile(&pReader->pDataFD);
×
640
      tsdbCloseFile(&pReader->pHeadFD);
×
641
      taosMemoryFree(pReader);
×
642
    }
643
  } else {
644
    *ppReader = pReader;
×
645
  }
646
  return code;
×
647
}
648

649
void tsdbDataFReaderClose(SDataFReader **ppReader) {
×
650
  if (*ppReader == NULL) return;
×
651

652
  // head
653
  tsdbCloseFile(&(*ppReader)->pHeadFD);
×
654

655
  // data
656
  tsdbCloseFile(&(*ppReader)->pDataFD);
×
657

658
  // sma
659
  tsdbCloseFile(&(*ppReader)->pSmaFD);
×
660

661
  // stt
662
  for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
×
663
    if ((*ppReader)->aSttFD[iStt]) {
×
664
      tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
×
665
    }
666
  }
667

668
  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
×
669
    tFree((*ppReader)->aBuf[iBuf]);
×
670
  }
671
  taosMemoryFree(*ppReader);
×
672
  *ppReader = NULL;
×
673
}
674

675
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
×
676
  int32_t    code = 0;
×
677
  int32_t    lino;
678
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
×
679
  int64_t    offset = pHeadFile->offset;
×
680
  int64_t    size = pHeadFile->size - offset;
×
681

682
  taosArrayClear(aBlockIdx);
×
683
  if (size == 0) {
×
684
    return code;
×
685
  }
686

687
  // alloc
688
  code = tRealloc(&pReader->aBuf[0], size);
×
689
  TSDB_CHECK_CODE(code, lino, _exit);
×
690

691
  // read
692
  SEncryptData *pEncryptData = &(pReader->pTsdb->pVnode->config.tsdbCfg.encryptData);
×
693
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, pEncryptData);
×
694
  TSDB_CHECK_CODE(code, lino, _exit);
×
695

696
  // decode
697
  int64_t n = 0;
×
698
  while (n < size) {
×
699
    SBlockIdx blockIdx;
×
700
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
×
701

702
    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
×
703
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
704
    }
705
  }
706
  if (n != size) {
×
707
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
708
  }
709

710
_exit:
×
711
  if (code) {
×
712
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
713
  }
714
  return code;
×
715
}
716

717
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
×
718
  int32_t   code = 0;
×
719
  int32_t   lino;
720
  SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
×
721
  int64_t   offset = pSttFile->offset;
×
722
  int64_t   size = pSttFile->size - offset;
×
723

724
  taosArrayClear(aSttBlk);
×
725
  if (size == 0) {
×
726
    return 0;
×
727
  }
728

729
  // alloc
730
  code = tRealloc(&pReader->aBuf[0], size);
×
731
  TSDB_CHECK_CODE(code, lino, _exit);
×
732

733
  // read
734
  SEncryptData *pEncryptData = &(pReader->pTsdb->pVnode->config.tsdbCfg.encryptData);
×
735
  code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0, pEncryptData);
×
736
  TSDB_CHECK_CODE(code, lino, _exit);
×
737

738
  // decode
739
  int64_t n = 0;
×
740
  while (n < size) {
×
741
    SSttBlk sttBlk;
×
742
    n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
×
743

744
    if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
×
745
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
746
    }
747
  }
748
  if (n != size) {
×
749
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
750
  }
751

752
_exit:
×
753
  if (code) {
×
754
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
755
  }
756
  return code;
×
757
}
758

759
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
×
760
  int32_t code = 0;
×
761
  int32_t lino;
762
  int64_t offset = pBlockIdx->offset;
×
763
  int64_t size = pBlockIdx->size;
×
764

765
  // alloc
766
  code = tRealloc(&pReader->aBuf[0], size);
×
767
  TSDB_CHECK_CODE(code, lino, _exit);
×
768

769
  // read
770
  SEncryptData *pEncryptData = &(pReader->pTsdb->pVnode->config.tsdbCfg.encryptData);
×
771

772
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, pEncryptData);
×
773
  TSDB_CHECK_CODE(code, lino, _exit);
×
774

775
  // decode
776
  int32_t n;
×
777
  code = tGetMapData(pReader->aBuf[0], mDataBlk, &n);
×
778
  if (code) goto _exit;
×
779
  if (n != size) {
×
780
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
781
  }
782

783
_exit:
×
784
  if (code) {
×
785
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
786
  }
787
  return code;
×
788
}
789

790
// SDelFReader ====================================================
791
struct SDelFReader {
792
  STsdb   *pTsdb;
793
  SDelFile fDel;
794
  STsdbFD *pReadH;
795
  uint8_t *aBuf[1];
796
};
797

798
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
×
799
  int32_t      code = 0;
×
800
  int32_t      lino = 0;
×
801
  char         fname[TSDB_FILENAME_LEN];
×
802
  SDelFReader *pDelFReader = NULL;
×
803

804
  // alloc
805
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
×
806
  if (pDelFReader == NULL) {
×
807
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
808
  }
809

810
  // open impl
811
  pDelFReader->pTsdb = pTsdb;
×
812
  pDelFReader->fDel = *pFile;
×
813

814
  tsdbDelFileName(pTsdb, pFile, fname);
×
815
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH, 0);
×
816
  TSDB_CHECK_CODE(code, lino, _exit);
×
817

818
_exit:
×
819
  if (code) {
×
820
    *ppReader = NULL;
×
821
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
822
    taosMemoryFree(pDelFReader);
×
823
  } else {
824
    *ppReader = pDelFReader;
×
825
  }
826
  return code;
×
827
}
828

829
void tsdbDelFReaderClose(SDelFReader **ppReader) {
×
830
  int32_t      code = 0;
×
831
  SDelFReader *pReader = *ppReader;
×
832

833
  if (pReader) {
×
834
    tsdbCloseFile(&pReader->pReadH);
×
835
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
×
836
      tFree(pReader->aBuf[iBuf]);
×
837
    }
838
    taosMemoryFree(pReader);
×
839
  }
840

841
  *ppReader = NULL;
×
842
}
×
843

844
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
845
  return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX);
×
846
}
847

848
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
×
849
  int32_t code = 0;
×
850
  int32_t lino;
851
  int64_t offset = pDelIdx->offset;
×
852
  int64_t size = pDelIdx->size;
×
853
  int64_t n;
854

855
  taosArrayClear(aDelData);
×
856

857
  // alloc
858
  code = tRealloc(&pReader->aBuf[0], size);
×
859
  TSDB_CHECK_CODE(code, lino, _exit);
×
860

861
  // read
862
  SEncryptData *pEncryptData = &(pReader->pTsdb->pVnode->config.tsdbCfg.encryptData);
×
863

864
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, pEncryptData);
×
865
  TSDB_CHECK_CODE(code, lino, _exit);
×
866

867
  // // decode
868
  n = 0;
×
869
  while (n < size) {
×
870
    SDelData delData;
×
871
    n += tGetDelData(pReader->aBuf[0] + n, &delData);
×
872

873
    if (delData.version > maxVer) {
×
874
      continue;
×
875
    }
876
    if (taosArrayPush(aDelData, &delData) == NULL) {
×
877
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
878
    }
879
  }
880

881
  if (n != size) {
×
882
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
883
  }
884

885
_exit:
×
886
  if (code) {
×
887
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
888
  }
889
  return code;
×
890
}
891

892
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
×
893
  int32_t code = 0;
×
894
  int32_t lino;
895
  int32_t n;
896
  int64_t offset = pReader->fDel.offset;
×
897
  int64_t size = pReader->fDel.size - offset;
×
898

899
  taosArrayClear(aDelIdx);
×
900

901
  // alloc
902
  code = tRealloc(&pReader->aBuf[0], size);
×
903
  TSDB_CHECK_CODE(code, lino, _exit);
×
904

905
  // read
906
  SEncryptData *pEncryptData = &(pReader->pTsdb->pVnode->config.tsdbCfg.encryptData);
×
907

908
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, pEncryptData);
×
909
  TSDB_CHECK_CODE(code, lino, _exit);
×
910

911
  // decode
912
  n = 0;
×
913
  while (n < size) {
×
914
    SDelIdx delIdx;
×
915

916
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
×
917

918
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
×
919
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
920
    }
921
  }
922

923
  if (n != size) {
×
924
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
925
  }
926

927
_exit:
×
928
  if (code) {
×
929
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
930
  }
931
  return code;
×
932
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc