• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4858

17 Nov 2025 09:53AM UTC coverage: 64.048% (-0.09%) from 64.135%
#4858

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

218 of 311 new or added lines in 32 files covered. (70.1%)

4856 existing lines in 134 files now uncovered.

151096 of 235910 relevant lines covered (64.05%)

115767925.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.38
/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "tss.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20
#include "vnd.h"
21

22
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
229,859,250✔
23
  int32_t     code = 0;
229,859,250✔
24
  int32_t     lino;
25
  const char *path = pFD->path;
229,859,250✔
26
  int32_t     szPage = pFD->szPage;
229,891,420✔
27
  int32_t     flag = pFD->flag;
229,878,681✔
28
  int64_t     lc_size = 0;
229,925,438✔
29

30
  pFD->pFD = taosOpenFile(path, flag);
229,933,974✔
31
  if (pFD->pFD == NULL) {
229,912,872✔
32
    if (tsSsEnabled && pFD->lcn > 1 && !strncmp(path + strlen(path) - 5, ".data", 5)) {
×
33
      char lc_path[TSDB_FILENAME_LEN];
×
34
      tstrncpy(lc_path, path, TSDB_FQDN_LEN);
×
35

36
      int32_t     vid = 0;
×
37
      const char *object_name = taosDirEntryBaseName((char *)path);
×
38
      (void)sscanf(object_name, "v%df%dver%" PRId64, &vid, &pFD->fid, &pFD->cid);
×
39

40
      char *dot = strrchr(lc_path, '.');
×
41
      if (!dot) {
×
42
        tsdbError("unexpected path: %s", lc_path);
×
43
        TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
×
44
      }
45
      snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", pFD->lcn);
×
46

47
      pFD->pFD = taosOpenFile(lc_path, flag);
×
48
      if (pFD->pFD == NULL) {
×
49
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
50
      }
51
      if (taosStatFile(lc_path, &lc_size, NULL, NULL) < 0) {
×
52
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
53
      }
54
    } else {
55
      tsdbInfo("no file: %s", path);
×
56
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
57
    }
58
    pFD->ssFile = 1;
×
59
  }
60

61
  pFD->pBuf = taosMemoryCalloc(1, szPage);
229,920,489✔
62
  if (pFD->pBuf == NULL) {
229,905,826✔
63
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
64
  }
65

66
  if (lc_size > 0) {
229,918,194✔
67
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
68
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
69

70
    pFD->szFile = lc_size + chunksize * (pFD->lcn - 1);
×
71
  }
72

73
  // not check file size when reading data files.
74
  if (flag != TD_FILE_READ /* && !pFD->s3File*/) {
229,918,194✔
75
    if (!lc_size && taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
17,628,969✔
76
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
77
    }
78
  }
79

80
  if (pFD->szFile % szPage != 0) {
229,904,263✔
81
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
82
  }
83
  pFD->szFile = pFD->szFile / szPage;
229,919,742✔
84

85
_exit:
229,971,590✔
86
  if (code) {
229,973,992✔
87
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
88
  }
89
  return code;
229,973,992✔
90
}
91

92
// =============== PAGE-WISE FILE ===============
93
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn) {
238,627,862✔
94
  int32_t  code = 0;
238,627,862✔
95
  int32_t  lino;
96
  STsdbFD *pFD = NULL;
238,627,862✔
97
  int32_t  szPage = pTsdb->pVnode->config.tsdbPageSize;
238,627,862✔
98

99
  *ppFD = NULL;
238,657,769✔
100

101
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
238,693,521✔
102
  if (pFD == NULL) {
238,500,978✔
103
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
104
  }
105

106
  pFD->path = (char *)&pFD[1];
238,500,978✔
107
  memcpy(pFD->path, path, strlen(path) + 1);
238,507,100✔
108
  pFD->szPage = szPage;
238,664,803✔
109
  pFD->flag = flag;
238,677,056✔
110
  pFD->szPage = szPage;
238,671,756✔
111
  pFD->pgno = 0;
238,632,916✔
112
  pFD->lcn = lcn;
238,687,758✔
113
  pFD->pTsdb = pTsdb;
238,676,247✔
114

115
  *ppFD = pFD;
238,664,767✔
116

117
_exit:
238,615,152✔
118
  if (code) {
238,615,152✔
119
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
120
  }
121
  return code;
238,613,905✔
122
}
123

124
void tsdbCloseFile(STsdbFD **ppFD) {
238,666,762✔
125
  STsdbFD *pFD = *ppFD;
238,666,762✔
126
  if (pFD) {
238,681,987✔
127
    taosMemoryFree(pFD->pBuf);
238,685,046✔
128
    int32_t code = taosCloseFile(&pFD->pFD);
238,659,373✔
129
    if (code) {
238,608,966✔
130
      tsdbError("failed to close file: %s, code:%d reason:%s", pFD->path, code, tstrerror(code));
×
131
    } else {
132
      tsdbTrace("close file: %s", pFD->path);
238,608,966✔
133
    }
134
    taosMemoryFree(pFD);
238,578,031✔
135
    *ppFD = NULL;
238,654,299✔
136
  }
137
}
238,654,533✔
138

139
static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
603,668,805✔
140
  int32_t code = 0;
603,668,805✔
141
  int32_t lino;
142

143
  if (!pFD->pFD) {
603,668,805✔
144
    code = tsdbOpenFileImpl(pFD);
17,624,517✔
145
    TSDB_CHECK_CODE(code, lino, _exit);
17,614,211✔
146
  }
147

148
  if (pFD->pgno > 0) {
603,687,378✔
149
    int64_t offset = PAGE_OFFSET(pFD->pgno, pFD->szPage);
586,089,540✔
150
    if (pFD->ssFile && pFD->lcn > 1) {
586,080,273✔
151
      SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
152
      int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
153
      int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
154

155
      offset -= chunkoffset;
×
156
    }
157

158
    int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
586,088,065✔
159
    if (n < 0) {
586,081,305✔
160
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
161
    }
162

163
    code = taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
586,081,305✔
164
    TSDB_CHECK_CODE(code, lino, _exit);
586,095,791✔
165

166
    if (encryptAlgorithm == DND_CA_SM4) {
586,095,791✔
167
      // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
168
      unsigned char PacketData[128];
1,508✔
169
      int           NewLen;
170
      int32_t       count = 0;
1,508✔
171
      while (count < pFD->szPage) {
49,764✔
172
        SCryptOpts opts = {0};
48,256✔
173
        opts.len = 128;
48,256✔
174
        opts.source = pFD->pBuf + count;
48,256✔
175
        opts.result = PacketData;
48,256✔
176
        opts.unitLen = 128;
48,256✔
177
        tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
48,256✔
178

179
        NewLen = CBC_Encrypt(&opts);
48,256✔
180

181
        memcpy(pFD->pBuf + count, PacketData, NewLen);
48,256✔
182
        count += NewLen;
48,256✔
183
      }
184
      // tsdbDebug("CBC_Encrypt count:%d %s", count, __FUNCTION__);
185
    }
186

187
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
586,095,791✔
188
    if (n < 0) {
586,083,284✔
189
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
190
    }
191

192
    if (pFD->szFile < pFD->pgno) {
586,083,284✔
193
      pFD->szFile = pFD->pgno;
585,607,470✔
194
    }
195
  }
196
  pFD->pgno = 0;
603,709,341✔
197

198
_exit:
603,726,105✔
199
  if (code) {
603,726,105✔
200
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
201
  }
202
  return code;
603,693,544✔
203
}
204

205
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgorithm, char *encryptKey) {
499,147,107✔
206
  int32_t code = 0;
499,147,107✔
207
  int32_t lino;
208

209
  if (!pFD->pFD) {
499,147,107✔
210
    code = tsdbOpenFileImpl(pFD);
×
211
    TSDB_CHECK_CODE(code, lino, _exit);
×
212
  }
213

214
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
499,192,863✔
215
  if (pFD->lcn > 1) {
499,192,358✔
216
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
217
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
218
    int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
219

220
    offset -= chunkoffset;
×
221
  }
222

223
  // seek
224
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
499,186,753✔
225
  if (n < 0) {
499,168,430✔
226
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
227
  }
228

229
  // read
230
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
499,168,430✔
231
  if (n < 0) {
499,111,114✔
UNCOV
232
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
233
  } else if (n < pFD->szPage) {
499,111,114✔
234
    tsdbError(
×
235
        "vgId:%d %s failed at %s:%d since read file size is less than page size, "
236
        "read size: %" PRId64 ", page size: %d, fname:%s, pgno:%" PRId64,
237
        TD_VID(pFD->pTsdb->pVnode), __func__, __FILE__, __LINE__, n, pFD->szPage, pFD->path, pFD->pgno);
238
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
239
  }
240
  //}
241

242
  if (encryptAlgorithm == DND_CA_SM4) {
499,190,473✔
243
    // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
244
    unsigned char PacketData[128];
×
245
    int           NewLen;
246

247
    int32_t count = 0;
×
248
    while (count < pFD->szPage) {
×
249
      SCryptOpts opts = {0};
×
250
      opts.len = 128;
×
251
      opts.source = pFD->pBuf + count;
×
252
      opts.result = PacketData;
×
253
      opts.unitLen = 128;
×
254
      tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
×
255

256
      NewLen = CBC_Decrypt(&opts);
×
257

258
      memcpy(pFD->pBuf + count, PacketData, NewLen);
×
259
      count += NewLen;
×
260
    }
261
    // tsdbDebug("CBC_Decrypt count:%d %s", count, __FUNCTION__);
262
  }
263

264
  // check
265
  if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
793,209,036✔
266
    tsdbError("vgId:%d %s failed at %s:%d since checksum mismatch, fname:%s, pgno:%" PRId64, TD_VID(pFD->pTsdb->pVnode),
×
267
              __func__, __FILE__, __LINE__, pFD->path, pgno);
268
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
269
  }
270

271
  pFD->pgno = pgno;
499,200,263✔
272

273
_exit:
499,213,880✔
274
  if (code) {
499,213,880✔
275
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
276
  }
277
  return code;
499,116,814✔
278
}
279

280
int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
451,307,832✔
281
                      char *encryptKey) {
282
  int32_t code = 0;
451,307,832✔
283
  int32_t lino;
284
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
451,307,832✔
285
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
451,375,940✔
286
  int64_t bOffset = fOffset % pFD->szPage;
451,406,712✔
287
  int64_t n = 0;
451,411,882✔
288

289
  do {
290
    if (pFD->pgno != pgno) {
1,019,846,747✔
291
      code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
586,069,467✔
292
      TSDB_CHECK_CODE(code, lino, _exit);
586,072,714✔
293

294
      if (pgno <= pFD->szFile) {
586,072,714✔
295
        code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
484,906✔
296
        TSDB_CHECK_CODE(code, lino, _exit);
484,906✔
297
      } else {
298
        pFD->pgno = pgno;
585,590,577✔
299
      }
300
    }
301

302
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
1,019,745,723✔
303
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
1,019,770,987✔
304

305
    pgno++;
1,019,747,799✔
306
    bOffset = 0;
1,019,747,799✔
307
    n += nWrite;
1,019,747,799✔
308
  } while (n < size);
1,019,747,799✔
309

310
_exit:
451,312,934✔
311
  if (code) {
451,312,934✔
312
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
313
  }
314
  return code;
451,312,450✔
315
}
316

317
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
1,460,965,317✔
318
                               char *encryptKey) {
319
  int32_t code = 0;
1,460,965,317✔
320
  int32_t lino;
321
  int64_t n = 0;
1,460,965,317✔
322
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
1,460,965,317✔
323
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
1,461,128,162✔
324
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
1,461,258,537✔
325
  int64_t bOffset = fOffset % pFD->szPage;
1,461,274,123✔
326

327
  if (bOffset >= szPgCont) {
1,461,270,248✔
328
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
329
  }
330

331
  while (n < size) {
2,147,483,647✔
332
    if (pFD->pgno != pgno) {
1,667,495,181✔
333
      code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
498,692,225✔
334
      TSDB_CHECK_CODE(code, lino, _exit);
498,614,734✔
335
    }
336

337
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
1,667,529,220✔
338
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
1,667,529,220✔
339

340
    n += nRead;
1,667,416,483✔
341
    pgno++;
1,667,416,483✔
342
    bOffset = 0;
1,667,416,483✔
343
  }
344

345
_exit:
1,461,191,550✔
346
  if (code) {
1,461,191,550✔
347
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
348
  }
349
  return code;
1,460,980,424✔
350
}
351

352

353
#ifdef USE_SHARED_STORAGE
354
static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, uint8_t **ppBlock) {
×
355
  int32_t code = 0, lino, vid = TD_VID(pFD->pTsdb->pVnode);
×
356

357
  char* buf = taosMemoryCalloc(1, size);
×
358
  if (buf == NULL) {
×
359
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
360
  }
361

362
  SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
363
  int64_t szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
364

365
  for (int64_t n = 0, nRead = 0; n < size; n += nRead, offset += nRead) {
×
366
    int chunk = offset / szChunk + 1;
×
367

368
    if (chunk >= pFD->lcn) {
×
369
      if (taosLSeekFile(pFD->pFD, offset - szChunk * (pFD->lcn - 1), SEEK_SET) < 0) {
×
370
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
371
      }
372

373
      int64_t toRead = size - n;
×
374
      nRead = taosReadFile(pFD->pFD, buf + n, toRead);
×
375
      if (nRead < 0) {
×
376
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
377
      } else if (nRead < toRead) {
×
378
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
379
      }
380
    } else if (tsSsEnabled) {
×
381
      int64_t toRead = TMIN(szChunk - offset % szChunk, size - n);
×
382
      nRead = toRead;
×
383

384
      char path[TSDB_FILENAME_LEN];
×
385
      sprintf(path, "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, pFD->fid, vid, pFD->fid, pFD->cid, chunk);
×
386

387
      code = tssReadFileFromDefault(path, offset % szChunk, buf + n, &nRead);
×
388
      TSDB_CHECK_CODE(code, lino, _exit);
×
389
      if (nRead < toRead) {
×
390
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
391
      }
392
    } else {
393
      TSDB_CHECK_CODE(code = TSDB_CODE_OPS_NOT_SUPPORT, lino, _exit);
×
394
    }
395
  }
396

397
_exit:
×
398
  if (code) {
×
399
    TSDB_ERROR_LOG(vid, lino, code);
×
400
    taosMemoryFree(buf);
×
401
  } else {
402
    *ppBlock = (uint8_t *)buf;
×
403
  }
404
  return code;
×
405
}
406
#endif
407

408

409

410
static int32_t tsdbReadFileSs(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
×
411
#ifdef USE_SHARED_STORAGE
412
  int32_t code = 0;
×
413
  int32_t lino;
414
  int64_t n = 0;
×
415
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
×
416
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
×
417
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
×
418
  int64_t bOffset = fOffset % pFD->szPage;
×
419

420
  if (bOffset >= szPgCont) {
×
421
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
422
  }
423

424
  // 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
425
  // 2, fetch pgnoStart ~ pgnoEnd from s3
426
  // 3, store pgs to pcache & last pg to pFD->pBuf
427
  // 4, deliver pgs to [pBuf, pBuf + size)
428

429
  while (n < size) {
×
430
    if (pFD->pgno != pgno) {
×
431
      LRUHandle *handle = NULL;
×
432
      code = tsdbCacheGetPageSs(pFD->pTsdb->pgCache, pFD, pgno, &handle);
×
433
      if (code != TSDB_CODE_SUCCESS) {
×
434
        if (handle) {
×
435
          tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
436
        }
437
        TSDB_CHECK_CODE(code, lino, _exit);
×
438
      }
439

440
      if (!handle) {
×
441
        break;
×
442
      }
443

444
      uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
×
445
      memcpy(pFD->pBuf, pPage, pFD->szPage);
×
446
      tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
447

448
      // check
449
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
450
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
451
      }
452

453
      pFD->pgno = pgno;
×
454
    }
455

456
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
457
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
458

459
    n += nRead;
×
460
    ++pgno;
×
461
    bOffset = 0;
×
462
  }
463

464
  if (n < size) {
×
465
    // 2, retrieve pgs from s3
466
    uint8_t *pBlock = NULL;
×
467
    int64_t  retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
×
468
    int64_t  pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont;
×
469

470
    if (szHint > 0) {
×
471
      pgnoEnd = pgno - 1 + (bOffset + szHint - n + szPgCont - 1) / szPgCont;
×
472
    }
473

474
    int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
×
475

476
    code = tsdbReadFileBlock(pFD, retrieve_offset, retrieve_size, &pBlock);
×
477
    TSDB_CHECK_CODE(code, lino, _exit);
×
478
    // 3, Store Pages in Cache
479
    int nPage = pgnoEnd - pgno + 1;
×
480
    for (int i = 0; i < nPage; ++i) {
×
481
      if (pFD->szFile != pgno) {  // DONOT cache last volatile page
×
482
        tsdbCacheSetPageSs(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
×
483
      }
484

485
      if (szHint > 0 && n >= size) {
×
486
        ++pgno;
×
487
        continue;
×
488
      }
489
      memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
×
490

491
      // check
492
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
493
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
494
      }
495

496
      pFD->pgno = pgno;
×
497

498
      int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
499
      memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
500

501
      n += nRead;
×
502
      ++pgno;
×
503
      bOffset = 0;
×
504
    }
505

506
    taosMemoryFree(pBlock);
×
507
  }
508

509
_exit:
×
510
  if (code) {
×
511
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
512
  }
513
  return code;
×
514
#else
515
  return TSDB_CODE_INTERNAL_ERROR;
516
#endif
517
}
518

519
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
1,460,852,737✔
520
                     int32_t encryptAlgorithm, char *encryptKey) {
521
  int32_t code = 0;
1,460,852,737✔
522
  int32_t lino;
523

524
  if (!pFD->pFD) {
1,460,852,737✔
525
    code = tsdbOpenFileImpl(pFD);
212,312,734✔
526
    TSDB_CHECK_CODE(code, lino, _exit);
212,296,063✔
527
  }
528

529
  if (pFD->ssFile && pFD->lcn > 1 /* && tsSsBlockSize < 0*/) {
1,460,999,410✔
530
    code = tsdbReadFileSs(pFD, offset, pBuf, size, szHint);
×
531
    TSDB_CHECK_CODE(code, lino, _exit);
×
532
  } else {
533
    code = tsdbReadFileImp(pFD, offset, pBuf, size, encryptAlgorithm, encryptKey);
1,461,088,345✔
534
    TSDB_CHECK_CODE(code, lino, _exit);
1,460,999,725✔
535
  }
536

537
_exit:
1,460,999,725✔
538
  if (code) {
1,460,999,725✔
539
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
540
  }
541
  return code;
1,461,017,762✔
542
}
543

544
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
859,404,161✔
545
                             int32_t encryptAlgorithm, char *encryptKey) {
546
  int32_t code;
547
  int32_t lino;
548

549
  code = tBufferEnsureCapacity(buffer, buffer->size + size);
859,404,161✔
550
  TSDB_CHECK_CODE(code, lino, _exit);
859,469,691✔
551

552
  code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint, encryptAlgorithm, encryptKey);
859,469,691✔
553
  TSDB_CHECK_CODE(code, lino, _exit);
859,680,035✔
554

555
  buffer->size += size;
859,680,035✔
556

557
_exit:
859,724,346✔
558
  if (code) {
859,724,346✔
559
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
560
  }
561
  return code;
859,677,452✔
562
}
563

564
int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
17,625,703✔
565
  int32_t code = 0;
17,625,703✔
566
  int32_t lino;
567

568
  code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
17,625,703✔
569
  TSDB_CHECK_CODE(code, lino, _exit);
17,625,403✔
570

571
  if (taosFsyncFile(pFD->pFD) < 0) {
17,625,403✔
572
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ERRNO), lino, _exit);
×
573
  }
574

575
_exit:
17,634,494✔
576
  if (code) {
17,634,494✔
577
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
578
  }
579
  return code;
17,633,735✔
580
}
581

582
// SDataFReader ====================================================
583
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
×
584
  int32_t       code = 0;
×
585
  int32_t       lino = 0;
×
586
  SDataFReader *pReader = NULL;
×
587
  int32_t       szPage = pTsdb->pVnode->config.tsdbPageSize;
×
588
  char          fname[TSDB_FILENAME_LEN];
×
589

590
  // alloc
591
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
×
592
  if (pReader == NULL) {
×
593
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
594
  }
595
  pReader->pTsdb = pTsdb;
×
596
  pReader->pSet = pSet;
×
597

598
  // head
599
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
×
600
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD, 0);
×
601
  TSDB_CHECK_CODE(code, lino, _exit);
×
602

603
  // data
604
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
×
605
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD, 0);
×
606
  TSDB_CHECK_CODE(code, lino, _exit);
×
607

608
  // sma
609
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
×
610
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD, 0);
×
611
  TSDB_CHECK_CODE(code, lino, _exit);
×
612

613
  // stt
614
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
×
615
    tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
×
616
    code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt], 0);
×
617
    TSDB_CHECK_CODE(code, lino, _exit);
×
618
  }
619

620
_exit:
×
621
  if (code) {
×
622
    *ppReader = NULL;
×
623
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
624

625
    if (pReader) {
×
626
      for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
×
627
      tsdbCloseFile(&pReader->pSmaFD);
×
628
      tsdbCloseFile(&pReader->pDataFD);
×
629
      tsdbCloseFile(&pReader->pHeadFD);
×
630
      taosMemoryFree(pReader);
×
631
    }
632
  } else {
633
    *ppReader = pReader;
×
634
  }
635
  return code;
×
636
}
637

638
void tsdbDataFReaderClose(SDataFReader **ppReader) {
×
639
  if (*ppReader == NULL) return;
×
640

641
  // head
642
  tsdbCloseFile(&(*ppReader)->pHeadFD);
×
643

644
  // data
645
  tsdbCloseFile(&(*ppReader)->pDataFD);
×
646

647
  // sma
648
  tsdbCloseFile(&(*ppReader)->pSmaFD);
×
649

650
  // stt
651
  for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
×
652
    if ((*ppReader)->aSttFD[iStt]) {
×
653
      tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
×
654
    }
655
  }
656

657
  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
×
658
    tFree((*ppReader)->aBuf[iBuf]);
×
659
  }
660
  taosMemoryFree(*ppReader);
×
661
  *ppReader = NULL;
×
662
}
663

664
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
×
665
  int32_t    code = 0;
×
666
  int32_t    lino;
667
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
×
668
  int64_t    offset = pHeadFile->offset;
×
669
  int64_t    size = pHeadFile->size - offset;
×
670

671
  taosArrayClear(aBlockIdx);
×
672
  if (size == 0) {
×
673
    return code;
×
674
  }
675

676
  // alloc
677
  code = tRealloc(&pReader->aBuf[0], size);
×
678
  TSDB_CHECK_CODE(code, lino, _exit);
×
679

680
  // read
681
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
682
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
683
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
684
  TSDB_CHECK_CODE(code, lino, _exit);
×
685

686
  // decode
687
  int64_t n = 0;
×
688
  while (n < size) {
×
689
    SBlockIdx blockIdx;
×
690
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
×
691

692
    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
×
693
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
694
    }
695
  }
696
  if (n != size) {
×
697
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
698
  }
699

700
_exit:
×
701
  if (code) {
×
702
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
703
  }
704
  return code;
×
705
}
706

707
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
×
708
  int32_t   code = 0;
×
709
  int32_t   lino;
710
  SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
×
711
  int64_t   offset = pSttFile->offset;
×
712
  int64_t   size = pSttFile->size - offset;
×
713

714
  taosArrayClear(aSttBlk);
×
715
  if (size == 0) {
×
716
    return 0;
×
717
  }
718

719
  // alloc
720
  code = tRealloc(&pReader->aBuf[0], size);
×
721
  TSDB_CHECK_CODE(code, lino, _exit);
×
722

723
  // read
724
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
725
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
726
  code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
727
  TSDB_CHECK_CODE(code, lino, _exit);
×
728

729
  // decode
730
  int64_t n = 0;
×
731
  while (n < size) {
×
732
    SSttBlk sttBlk;
×
733
    n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
×
734

735
    if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
×
736
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
737
    }
738
  }
739
  if (n != size) {
×
740
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
741
  }
742

743
_exit:
×
744
  if (code) {
×
745
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
746
  }
747
  return code;
×
748
}
749

750
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
×
751
  int32_t code = 0;
×
752
  int32_t lino;
753
  int64_t offset = pBlockIdx->offset;
×
754
  int64_t size = pBlockIdx->size;
×
755

756
  // alloc
757
  code = tRealloc(&pReader->aBuf[0], size);
×
758
  TSDB_CHECK_CODE(code, lino, _exit);
×
759

760
  // read
761
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
762
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
763
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
764
  TSDB_CHECK_CODE(code, lino, _exit);
×
765

766
  // decode
767
  int32_t n;
×
768
  code = tGetMapData(pReader->aBuf[0], mDataBlk, &n);
×
769
  if (code) goto _exit;
×
770
  if (n != size) {
×
771
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
772
  }
773

774
_exit:
×
775
  if (code) {
×
776
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
777
  }
778
  return code;
×
779
}
780

781
// SDelFReader ====================================================
782
struct SDelFReader {
783
  STsdb   *pTsdb;
784
  SDelFile fDel;
785
  STsdbFD *pReadH;
786
  uint8_t *aBuf[1];
787
};
788

789
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
×
790
  int32_t      code = 0;
×
791
  int32_t      lino = 0;
×
792
  char         fname[TSDB_FILENAME_LEN];
×
793
  SDelFReader *pDelFReader = NULL;
×
794

795
  // alloc
796
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
×
797
  if (pDelFReader == NULL) {
×
798
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
799
  }
800

801
  // open impl
802
  pDelFReader->pTsdb = pTsdb;
×
803
  pDelFReader->fDel = *pFile;
×
804

805
  tsdbDelFileName(pTsdb, pFile, fname);
×
806
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH, 0);
×
807
  TSDB_CHECK_CODE(code, lino, _exit);
×
808

809
_exit:
×
810
  if (code) {
×
811
    *ppReader = NULL;
×
812
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
813
    taosMemoryFree(pDelFReader);
×
814
  } else {
815
    *ppReader = pDelFReader;
×
816
  }
817
  return code;
×
818
}
819

820
void tsdbDelFReaderClose(SDelFReader **ppReader) {
×
821
  int32_t      code = 0;
×
822
  SDelFReader *pReader = *ppReader;
×
823

824
  if (pReader) {
×
825
    tsdbCloseFile(&pReader->pReadH);
×
826
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
×
827
      tFree(pReader->aBuf[iBuf]);
×
828
    }
829
    taosMemoryFree(pReader);
×
830
  }
831

832
  *ppReader = NULL;
×
833
}
×
834

835
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
836
  return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX);
×
837
}
838

839
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
×
840
  int32_t code = 0;
×
841
  int32_t lino;
842
  int64_t offset = pDelIdx->offset;
×
843
  int64_t size = pDelIdx->size;
×
844
  int64_t n;
845

846
  taosArrayClear(aDelData);
×
847

848
  // alloc
849
  code = tRealloc(&pReader->aBuf[0], size);
×
850
  TSDB_CHECK_CODE(code, lino, _exit);
×
851

852
  // read
853
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
854
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
855
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
856
  TSDB_CHECK_CODE(code, lino, _exit);
×
857

858
  // // decode
859
  n = 0;
×
860
  while (n < size) {
×
861
    SDelData delData;
×
862
    n += tGetDelData(pReader->aBuf[0] + n, &delData);
×
863

864
    if (delData.version > maxVer) {
×
865
      continue;
×
866
    }
867
    if (taosArrayPush(aDelData, &delData) == NULL) {
×
868
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
869
    }
870
  }
871

872
  if (n != size) {
×
873
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
874
  }
875

876
_exit:
×
877
  if (code) {
×
878
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
879
  }
880
  return code;
×
881
}
882

883
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
×
884
  int32_t code = 0;
×
885
  int32_t lino;
886
  int32_t n;
887
  int64_t offset = pReader->fDel.offset;
×
888
  int64_t size = pReader->fDel.size - offset;
×
889

890
  taosArrayClear(aDelIdx);
×
891

892
  // alloc
893
  code = tRealloc(&pReader->aBuf[0], size);
×
894
  TSDB_CHECK_CODE(code, lino, _exit);
×
895

896
  // read
897
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
898
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
899
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
900
  TSDB_CHECK_CODE(code, lino, _exit);
×
901

902
  // decode
903
  n = 0;
×
904
  while (n < size) {
×
905
    SDelIdx delIdx;
×
906

907
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
×
908

909
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
×
910
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
911
    }
912
  }
913

914
  if (n != size) {
×
915
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
916
  }
917

918
_exit:
×
919
  if (code) {
×
920
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
921
  }
922
  return code;
×
923
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc