• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4871

04 Dec 2025 01:55AM UTC coverage: 64.654% (+0.1%) from 64.545%
#4871

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

869 of 2219 new or added lines in 36 files covered. (39.16%)

441 existing lines in 120 files now uncovered.

159620 of 246882 relevant lines covered (64.65%)

110922946.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.97
/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "tss.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20
#include "vnd.h"
21

22
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
248,001,477✔
23
  int32_t     code = 0;
248,001,477✔
24
  int32_t     lino;
25
  const char *path = pFD->path;
248,001,477✔
26
  int32_t     szPage = pFD->szPage;
248,027,707✔
27
  int32_t     flag = pFD->flag;
248,040,394✔
28
  int64_t     lc_size = 0;
248,047,329✔
29

30
  pFD->pFD = taosOpenFile(path, flag);
248,058,109✔
31
  if (pFD->pFD == NULL) {
248,032,480✔
32
    if (TD_FILE_READ == flag) {
×
33
      int32_t expired = grantCheck(TSDB_GRANT_SHARED_STORAGE);
×
34
      if (expired && tsSsEnabled) {
×
35
        tsdbWarn("s3 grant expired: %d", expired);
×
36
        tsSsEnabled = false;
×
37
      } else if (!expired && tsSsEnabled) {
×
38
        tsSsEnabled = true;
×
39
      }
40
    }
41

42
    if (tsSsEnabled && pFD->lcn > 1 && !strncmp(path + strlen(path) - 5, ".data", 5)) {
×
43
      char lc_path[TSDB_FILENAME_LEN];
×
44
      tstrncpy(lc_path, path, TSDB_FQDN_LEN);
×
45

46
      int32_t     vid = 0;
×
47
      const char *object_name = taosDirEntryBaseName((char *)path);
×
48
      (void)sscanf(object_name, "v%df%dver%" PRId64, &vid, &pFD->fid, &pFD->cid);
×
49

50
      char *dot = strrchr(lc_path, '.');
×
51
      if (!dot) {
×
52
        tsdbError("unexpected path: %s", lc_path);
×
53
        TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
×
54
      }
55
      snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", pFD->lcn);
×
56

57
      pFD->pFD = taosOpenFile(lc_path, flag);
×
58
      if (pFD->pFD == NULL) {
×
59
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
60
      }
61
      if (taosStatFile(lc_path, &lc_size, NULL, NULL) < 0) {
×
62
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
63
      }
64
    } else {
65
      tsdbInfo("no file: %s", path);
×
66
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
67
    }
68
    pFD->ssFile = 1;
×
69
  }
70

71
  pFD->pBuf = taosMemoryCalloc(1, szPage);
248,045,857✔
72
  if (pFD->pBuf == NULL) {
248,017,698✔
73
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
74
  }
75

76
  if (lc_size > 0) {
248,024,120✔
77
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
78
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
79

80
    pFD->szFile = lc_size + chunksize * (pFD->lcn - 1);
×
81
  }
82

83
  // not check file size when reading data files.
84
  if (flag != TD_FILE_READ /* && !pFD->s3File*/) {
248,024,120✔
85
    if (!lc_size && taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
17,793,954✔
86
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
87
    }
88
  }
89

90
  if (pFD->szFile % szPage != 0) {
248,014,537✔
91
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
92
  }
93
  pFD->szFile = pFD->szFile / szPage;
248,031,683✔
94

95
_exit:
248,060,197✔
96
  if (code) {
248,069,227✔
97
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
98
  }
99
  return code;
248,069,227✔
100
}
101

102
// =============== PAGE-WISE FILE ===============
103
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn) {
257,070,176✔
104
  int32_t  code = 0;
257,070,176✔
105
  int32_t  lino;
106
  STsdbFD *pFD = NULL;
257,070,176✔
107
  int32_t  szPage = pTsdb->pVnode->config.tsdbPageSize;
257,070,176✔
108

109
  *ppFD = NULL;
257,096,864✔
110

111
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
257,115,909✔
112
  if (pFD == NULL) {
256,980,102✔
113
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
114
  }
115

116
  pFD->path = (char *)&pFD[1];
256,980,102✔
117
  memcpy(pFD->path, path, strlen(path) + 1);
256,988,343✔
118
  pFD->szPage = szPage;
257,093,990✔
119
  pFD->flag = flag;
257,107,630✔
120
  pFD->szPage = szPage;
257,092,766✔
121
  pFD->pgno = 0;
257,071,501✔
122
  pFD->lcn = lcn;
257,113,677✔
123
  pFD->pTsdb = pTsdb;
257,108,001✔
124

125
  *ppFD = pFD;
257,082,185✔
126

127
_exit:
257,096,934✔
128
  if (code) {
257,096,934✔
129
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
130
  }
131
  return code;
257,051,756✔
132
}
133

134
void tsdbCloseFile(STsdbFD **ppFD) {
257,094,135✔
135
  STsdbFD *pFD = *ppFD;
257,094,135✔
136
  if (pFD) {
257,106,025✔
137
    taosMemoryFree(pFD->pBuf);
257,113,260✔
138
    int32_t code = taosCloseFile(&pFD->pFD);
257,090,740✔
139
    if (code) {
257,050,523✔
140
      tsdbError("failed to close file: %s, code:%d reason:%s", pFD->path, code, tstrerror(code));
×
141
    } else {
142
      tsdbTrace("close file: %s", pFD->path);
257,050,523✔
143
    }
144
    taosMemoryFree(pFD);
257,035,498✔
145
    *ppFD = NULL;
257,084,624✔
146
  }
147
}
257,079,634✔
148

149
static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
604,585,359✔
150
  int32_t code = 0;
604,585,359✔
151
  int32_t lino;
152

153
  if (!pFD->pFD) {
604,585,359✔
154
    code = tsdbOpenFileImpl(pFD);
17,792,011✔
155
    TSDB_CHECK_CODE(code, lino, _exit);
17,783,410✔
156
  }
157

158
  if (pFD->pgno > 0) {
604,594,416✔
159
    int64_t offset = PAGE_OFFSET(pFD->pgno, pFD->szPage);
586,815,783✔
160
    if (pFD->ssFile && pFD->lcn > 1) {
586,813,164✔
161
      SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
162
      int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
163
      int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
164

165
      offset -= chunkoffset;
×
166
    }
167

168
    int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
586,817,498✔
169
    if (n < 0) {
586,809,245✔
170
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
171
    }
172

173
    code = taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
586,809,245✔
174
    TSDB_CHECK_CODE(code, lino, _exit);
586,821,965✔
175

176
    if (encryptAlgorithm == DND_CA_SM4) {
586,821,965✔
177
      // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
178
      unsigned char PacketData[128];
1,526✔
179
      int           NewLen;
180
      int32_t       count = 0;
1,526✔
181
      while (count < pFD->szPage) {
50,358✔
182
        SCryptOpts opts = {0};
48,832✔
183
        opts.len = 128;
48,832✔
184
        opts.source = pFD->pBuf + count;
48,832✔
185
        opts.result = PacketData;
48,832✔
186
        opts.unitLen = 128;
48,832✔
187
        tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
48,832✔
188

189
        NewLen = CBC_Encrypt(&opts);
48,832✔
190

191
        memcpy(pFD->pBuf + count, PacketData, NewLen);
48,832✔
192
        count += NewLen;
48,832✔
193
      }
194
      // tsdbDebug("CBC_Encrypt count:%d %s", count, __FUNCTION__);
195
    }
196

197
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
586,821,965✔
198
    if (n < 0) {
586,808,787✔
199
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
200
    }
201

202
    if (pFD->szFile < pFD->pgno) {
586,808,787✔
203
      pFD->szFile = pFD->pgno;
586,333,448✔
204
    }
205
  }
206
  pFD->pgno = 0;
604,609,649✔
207

208
_exit:
604,609,707✔
209
  if (code) {
604,609,707✔
210
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
211
  }
212
  return code;
604,590,330✔
213
}
214

215
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgorithm, char *encryptKey) {
522,422,984✔
216
  int32_t code = 0;
522,422,984✔
217
  int32_t lino;
218

219
  if (!pFD->pFD) {
522,422,984✔
220
    code = tsdbOpenFileImpl(pFD);
×
221
    TSDB_CHECK_CODE(code, lino, _exit);
×
222
  }
223

224
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
522,455,151✔
225
  if (pFD->lcn > 1) {
522,462,704✔
226
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
227
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
228
    int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
229

230
    offset -= chunkoffset;
×
231
  }
232

233
  // seek
234
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
522,472,701✔
235
  if (n < 0) {
522,410,486✔
236
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
237
  }
238

239
  // read
240
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
522,410,486✔
241
  if (n < 0) {
522,408,180✔
UNCOV
242
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
243
  } else if (n < pFD->szPage) {
522,408,180✔
244
    tsdbError(
×
245
        "vgId:%d %s failed at %s:%d since read file size is less than page size, "
246
        "read size: %" PRId64 ", page size: %d, fname:%s, pgno:%" PRId64,
247
        TD_VID(pFD->pTsdb->pVnode), __func__, __FILE__, __LINE__, n, pFD->szPage, pFD->path, pFD->pgno);
248
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
249
  }
250
  //}
251

252
  if (encryptAlgorithm == DND_CA_SM4) {
522,460,206✔
253
    // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
254
    unsigned char PacketData[128];
×
255
    int           NewLen;
256

257
    int32_t count = 0;
×
258
    while (count < pFD->szPage) {
×
259
      SCryptOpts opts = {0};
×
260
      opts.len = 128;
×
261
      opts.source = pFD->pBuf + count;
×
262
      opts.result = PacketData;
×
263
      opts.unitLen = 128;
×
264
      tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
×
265

266
      NewLen = CBC_Decrypt(&opts);
×
267

268
      memcpy(pFD->pBuf + count, PacketData, NewLen);
×
269
      count += NewLen;
×
270
    }
271
    // tsdbDebug("CBC_Decrypt count:%d %s", count, __FUNCTION__);
272
  }
273

274
  // check
275
  if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
820,839,095✔
276
    tsdbError("vgId:%d %s failed at %s:%d since checksum mismatch, fname:%s, pgno:%" PRId64, TD_VID(pFD->pTsdb->pVnode),
×
277
              __func__, __FILE__, __LINE__, pFD->path, pgno);
278
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
279
  }
280

281
  pFD->pgno = pgno;
522,468,910✔
282

283
_exit:
522,467,756✔
284
  if (code) {
522,467,756✔
285
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
286
  }
287
  return code;
522,381,517✔
288
}
289

290
int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
459,169,112✔
291
                      char *encryptKey) {
292
  int32_t code = 0;
459,169,112✔
293
  int32_t lino;
294
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
459,169,112✔
295
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
459,212,549✔
296
  int64_t bOffset = fOffset % pFD->szPage;
459,223,154✔
297
  int64_t n = 0;
459,231,111✔
298

299
  do {
300
    if (pFD->pgno != pgno) {
1,028,230,235✔
301
      code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
586,801,761✔
302
      TSDB_CHECK_CODE(code, lino, _exit);
586,796,856✔
303

304
      if (pgno <= pFD->szFile) {
586,796,856✔
305
        code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
486,342✔
306
        TSDB_CHECK_CODE(code, lino, _exit);
486,342✔
307
      } else {
308
        pFD->pgno = pgno;
586,321,562✔
309
      }
310
    }
311

312
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
1,028,163,526✔
313
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
1,028,176,666✔
314

315
    pgno++;
1,028,155,857✔
316
    bOffset = 0;
1,028,155,857✔
317
    n += nWrite;
1,028,155,857✔
318
  } while (n < size);
1,028,155,857✔
319

320
_exit:
459,156,733✔
321
  if (code) {
459,156,733✔
322
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
323
  }
324
  return code;
459,161,914✔
325
}
326

327
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
1,541,582,949✔
328
                               char *encryptKey) {
329
  int32_t code = 0;
1,541,582,949✔
330
  int32_t lino;
331
  int64_t n = 0;
1,541,582,949✔
332
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
1,541,582,949✔
333
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
1,541,665,291✔
334
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
1,541,774,228✔
335
  int64_t bOffset = fOffset % pFD->szPage;
1,541,771,891✔
336

337
  if (bOffset >= szPgCont) {
1,541,738,677✔
338
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
339
  }
340

341
  while (n < size) {
2,147,483,647✔
342
    if (pFD->pgno != pgno) {
1,749,946,623✔
343
      code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
521,965,136✔
344
      TSDB_CHECK_CODE(code, lino, _exit);
521,920,781✔
345
    }
346

347
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
1,749,955,886✔
348
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
1,749,955,886✔
349

350
    n += nRead;
1,749,873,997✔
351
    pgno++;
1,749,873,997✔
352
    bOffset = 0;
1,749,873,997✔
353
  }
354

355
_exit:
1,541,666,051✔
356
  if (code) {
1,541,666,051✔
357
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
358
  }
359
  return code;
1,541,613,283✔
360
}
361

362

363
#ifdef USE_SHARED_STORAGE
364
static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, uint8_t **ppBlock) {
×
365
  int32_t code = 0, lino, vid = TD_VID(pFD->pTsdb->pVnode);
×
366

367
  char* buf = taosMemoryCalloc(1, size);
×
368
  if (buf == NULL) {
×
369
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
370
  }
371

372
  SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
373
  int64_t szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
374

375
  for (int64_t n = 0, nRead = 0; n < size; n += nRead, offset += nRead) {
×
376
    int chunk = offset / szChunk + 1;
×
377

378
    if (chunk >= pFD->lcn) {
×
379
      if (taosLSeekFile(pFD->pFD, offset - szChunk * (pFD->lcn - 1), SEEK_SET) < 0) {
×
380
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
381
      }
382

383
      int64_t toRead = size - n;
×
384
      nRead = taosReadFile(pFD->pFD, buf + n, toRead);
×
385
      if (nRead < 0) {
×
386
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
387
      } else if (nRead < toRead) {
×
388
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
389
      }
390
    } else if (tsSsEnabled) {
×
391
      int64_t toRead = TMIN(szChunk - offset % szChunk, size - n);
×
392
      nRead = toRead;
×
393

394
      char path[TSDB_FILENAME_LEN];
×
395
      sprintf(path, "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, pFD->fid, vid, pFD->fid, pFD->cid, chunk);
×
396

397
      code = tssReadFileFromDefault(path, offset % szChunk, buf + n, &nRead);
×
398
      TSDB_CHECK_CODE(code, lino, _exit);
×
399
      if (nRead < toRead) {
×
400
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
401
      }
402
    } else {
403
      TSDB_CHECK_CODE(code = TSDB_CODE_OPS_NOT_SUPPORT, lino, _exit);
×
404
    }
405
  }
406

407
_exit:
×
408
  if (code) {
×
409
    TSDB_ERROR_LOG(vid, lino, code);
×
410
    taosMemoryFree(buf);
×
411
  } else {
412
    *ppBlock = (uint8_t *)buf;
×
413
  }
414
  return code;
×
415
}
416
#endif
417

418

419

420
static int32_t tsdbReadFileSs(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
×
421
#ifdef USE_SHARED_STORAGE
422
  int32_t code = 0;
×
423
  int32_t lino;
424
  int64_t n = 0;
×
425
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
×
426
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
×
427
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
×
428
  int64_t bOffset = fOffset % pFD->szPage;
×
429

430
  if (bOffset >= szPgCont) {
×
431
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
432
  }
433

434
  // 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
435
  // 2, fetch pgnoStart ~ pgnoEnd from s3
436
  // 3, store pgs to pcache & last pg to pFD->pBuf
437
  // 4, deliver pgs to [pBuf, pBuf + size)
438

439
  while (n < size) {
×
440
    if (pFD->pgno != pgno) {
×
441
      LRUHandle *handle = NULL;
×
442
      code = tsdbCacheGetPageSs(pFD->pTsdb->pgCache, pFD, pgno, &handle);
×
443
      if (code != TSDB_CODE_SUCCESS) {
×
444
        if (handle) {
×
445
          tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
446
        }
447
        TSDB_CHECK_CODE(code, lino, _exit);
×
448
      }
449

450
      if (!handle) {
×
451
        break;
×
452
      }
453

454
      uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
×
455
      memcpy(pFD->pBuf, pPage, pFD->szPage);
×
456
      tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
457

458
      // check
459
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
460
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
461
      }
462

463
      pFD->pgno = pgno;
×
464
    }
465

466
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
467
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
468

469
    n += nRead;
×
470
    ++pgno;
×
471
    bOffset = 0;
×
472
  }
473

474
  if (n < size) {
×
475
    // 2, retrieve pgs from s3
476
    uint8_t *pBlock = NULL;
×
477
    int64_t  retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
×
478
    int64_t  pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont;
×
479

480
    if (szHint > 0) {
×
481
      pgnoEnd = pgno - 1 + (bOffset + szHint - n + szPgCont - 1) / szPgCont;
×
482
    }
483

484
    int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
×
485

486
    code = tsdbReadFileBlock(pFD, retrieve_offset, retrieve_size, &pBlock);
×
487
    TSDB_CHECK_CODE(code, lino, _exit);
×
488
    // 3, Store Pages in Cache
489
    int nPage = pgnoEnd - pgno + 1;
×
490
    for (int i = 0; i < nPage; ++i) {
×
491
      if (pFD->szFile != pgno) {  // DONOT cache last volatile page
×
492
        tsdbCacheSetPageSs(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
×
493
      }
494

495
      if (szHint > 0 && n >= size) {
×
496
        ++pgno;
×
497
        continue;
×
498
      }
499
      memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
×
500

501
      // check
502
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
503
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
504
      }
505

506
      pFD->pgno = pgno;
×
507

508
      int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
509
      memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
510

511
      n += nRead;
×
512
      ++pgno;
×
513
      bOffset = 0;
×
514
    }
515

516
    taosMemoryFree(pBlock);
×
517
  }
518

519
_exit:
×
520
  if (code) {
×
521
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
522
  }
523
  return code;
×
524
#else
525
  return TSDB_CODE_INTERNAL_ERROR;
526
#endif
527
}
528

529
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
1,541,528,233✔
530
                     int32_t encryptAlgorithm, char *encryptKey) {
531
  int32_t code = 0;
1,541,528,233✔
532
  int32_t lino;
533

534
  if (!pFD->pFD) {
1,541,528,233✔
535
    code = tsdbOpenFileImpl(pFD);
230,264,663✔
536
    TSDB_CHECK_CODE(code, lino, _exit);
230,247,772✔
537
  }
538

539
  if (pFD->ssFile && pFD->lcn > 1 /* && tsSsBlockSize < 0*/) {
1,541,604,808✔
540
    code = tsdbReadFileSs(pFD, offset, pBuf, size, szHint);
×
541
    TSDB_CHECK_CODE(code, lino, _exit);
×
542
  } else {
543
    code = tsdbReadFileImp(pFD, offset, pBuf, size, encryptAlgorithm, encryptKey);
1,541,649,868✔
544
    TSDB_CHECK_CODE(code, lino, _exit);
1,541,617,684✔
545
  }
546

547
_exit:
1,541,617,684✔
548
  if (code) {
1,541,617,684✔
549
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
550
  }
551
  return code;
1,541,596,284✔
552
}
553

554
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
888,074,300✔
555
                             int32_t encryptAlgorithm, char *encryptKey) {
556
  int32_t code;
557
  int32_t lino;
558

559
  code = tBufferEnsureCapacity(buffer, buffer->size + size);
888,074,300✔
560
  TSDB_CHECK_CODE(code, lino, _exit);
888,165,979✔
561

562
  code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint, encryptAlgorithm, encryptKey);
888,165,979✔
563
  TSDB_CHECK_CODE(code, lino, _exit);
888,246,867✔
564

565
  buffer->size += size;
888,246,867✔
566

567
_exit:
888,269,233✔
568
  if (code) {
888,269,233✔
569
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
570
  }
571
  return code;
888,198,682✔
572
}
573

574
int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
17,790,402✔
575
  int32_t code = 0;
17,790,402✔
576
  int32_t lino;
577

578
  code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
17,790,402✔
579
  TSDB_CHECK_CODE(code, lino, _exit);
17,795,914✔
580

581
  if (taosFsyncFile(pFD->pFD) < 0) {
17,795,914✔
582
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ERRNO), lino, _exit);
×
583
  }
584

585
_exit:
17,799,744✔
586
  if (code) {
17,799,744✔
587
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
588
  }
589
  return code;
17,799,613✔
590
}
591

592
// SDataFReader ====================================================
593
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
×
594
  int32_t       code = 0;
×
595
  int32_t       lino = 0;
×
596
  SDataFReader *pReader = NULL;
×
597
  int32_t       szPage = pTsdb->pVnode->config.tsdbPageSize;
×
598
  char          fname[TSDB_FILENAME_LEN];
×
599

600
  // alloc
601
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
×
602
  if (pReader == NULL) {
×
603
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
604
  }
605
  pReader->pTsdb = pTsdb;
×
606
  pReader->pSet = pSet;
×
607

608
  // head
609
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
×
610
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD, 0);
×
611
  TSDB_CHECK_CODE(code, lino, _exit);
×
612

613
  // data
614
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
×
615
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD, 0);
×
616
  TSDB_CHECK_CODE(code, lino, _exit);
×
617

618
  // sma
619
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
×
620
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD, 0);
×
621
  TSDB_CHECK_CODE(code, lino, _exit);
×
622

623
  // stt
624
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
×
625
    tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
×
626
    code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt], 0);
×
627
    TSDB_CHECK_CODE(code, lino, _exit);
×
628
  }
629

630
_exit:
×
631
  if (code) {
×
632
    *ppReader = NULL;
×
633
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
634

635
    if (pReader) {
×
636
      for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
×
637
      tsdbCloseFile(&pReader->pSmaFD);
×
638
      tsdbCloseFile(&pReader->pDataFD);
×
639
      tsdbCloseFile(&pReader->pHeadFD);
×
640
      taosMemoryFree(pReader);
×
641
    }
642
  } else {
643
    *ppReader = pReader;
×
644
  }
645
  return code;
×
646
}
647

648
void tsdbDataFReaderClose(SDataFReader **ppReader) {
×
649
  if (*ppReader == NULL) return;
×
650

651
  // head
652
  tsdbCloseFile(&(*ppReader)->pHeadFD);
×
653

654
  // data
655
  tsdbCloseFile(&(*ppReader)->pDataFD);
×
656

657
  // sma
658
  tsdbCloseFile(&(*ppReader)->pSmaFD);
×
659

660
  // stt
661
  for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
×
662
    if ((*ppReader)->aSttFD[iStt]) {
×
663
      tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
×
664
    }
665
  }
666

667
  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
×
668
    tFree((*ppReader)->aBuf[iBuf]);
×
669
  }
670
  taosMemoryFree(*ppReader);
×
671
  *ppReader = NULL;
×
672
}
673

674
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
×
675
  int32_t    code = 0;
×
676
  int32_t    lino;
677
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
×
678
  int64_t    offset = pHeadFile->offset;
×
679
  int64_t    size = pHeadFile->size - offset;
×
680

681
  taosArrayClear(aBlockIdx);
×
682
  if (size == 0) {
×
683
    return code;
×
684
  }
685

686
  // alloc
687
  code = tRealloc(&pReader->aBuf[0], size);
×
688
  TSDB_CHECK_CODE(code, lino, _exit);
×
689

690
  // read
691
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
692
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
693
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
694
  TSDB_CHECK_CODE(code, lino, _exit);
×
695

696
  // decode
697
  int64_t n = 0;
×
698
  while (n < size) {
×
699
    SBlockIdx blockIdx;
×
700
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
×
701

702
    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
×
703
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
704
    }
705
  }
706
  if (n != size) {
×
707
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
708
  }
709

710
_exit:
×
711
  if (code) {
×
712
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
713
  }
714
  return code;
×
715
}
716

717
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
×
718
  int32_t   code = 0;
×
719
  int32_t   lino;
720
  SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
×
721
  int64_t   offset = pSttFile->offset;
×
722
  int64_t   size = pSttFile->size - offset;
×
723

724
  taosArrayClear(aSttBlk);
×
725
  if (size == 0) {
×
726
    return 0;
×
727
  }
728

729
  // alloc
730
  code = tRealloc(&pReader->aBuf[0], size);
×
731
  TSDB_CHECK_CODE(code, lino, _exit);
×
732

733
  // read
734
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
735
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
736
  code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
737
  TSDB_CHECK_CODE(code, lino, _exit);
×
738

739
  // decode
740
  int64_t n = 0;
×
741
  while (n < size) {
×
742
    SSttBlk sttBlk;
×
743
    n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
×
744

745
    if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
×
746
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
747
    }
748
  }
749
  if (n != size) {
×
750
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
751
  }
752

753
_exit:
×
754
  if (code) {
×
755
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
756
  }
757
  return code;
×
758
}
759

760
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
×
761
  int32_t code = 0;
×
762
  int32_t lino;
763
  int64_t offset = pBlockIdx->offset;
×
764
  int64_t size = pBlockIdx->size;
×
765

766
  // alloc
767
  code = tRealloc(&pReader->aBuf[0], size);
×
768
  TSDB_CHECK_CODE(code, lino, _exit);
×
769

770
  // read
771
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
772
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
773
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
774
  TSDB_CHECK_CODE(code, lino, _exit);
×
775

776
  // decode
777
  int32_t n;
×
778
  code = tGetMapData(pReader->aBuf[0], mDataBlk, &n);
×
779
  if (code) goto _exit;
×
780
  if (n != size) {
×
781
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
782
  }
783

784
_exit:
×
785
  if (code) {
×
786
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
787
  }
788
  return code;
×
789
}
790

791
// SDelFReader ====================================================
792
struct SDelFReader {
793
  STsdb   *pTsdb;
794
  SDelFile fDel;
795
  STsdbFD *pReadH;
796
  uint8_t *aBuf[1];
797
};
798

799
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
×
800
  int32_t      code = 0;
×
801
  int32_t      lino = 0;
×
802
  char         fname[TSDB_FILENAME_LEN];
×
803
  SDelFReader *pDelFReader = NULL;
×
804

805
  // alloc
806
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
×
807
  if (pDelFReader == NULL) {
×
808
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
809
  }
810

811
  // open impl
812
  pDelFReader->pTsdb = pTsdb;
×
813
  pDelFReader->fDel = *pFile;
×
814

815
  tsdbDelFileName(pTsdb, pFile, fname);
×
816
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH, 0);
×
817
  TSDB_CHECK_CODE(code, lino, _exit);
×
818

819
_exit:
×
820
  if (code) {
×
821
    *ppReader = NULL;
×
822
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
823
    taosMemoryFree(pDelFReader);
×
824
  } else {
825
    *ppReader = pDelFReader;
×
826
  }
827
  return code;
×
828
}
829

830
void tsdbDelFReaderClose(SDelFReader **ppReader) {
×
831
  int32_t      code = 0;
×
832
  SDelFReader *pReader = *ppReader;
×
833

834
  if (pReader) {
×
835
    tsdbCloseFile(&pReader->pReadH);
×
836
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
×
837
      tFree(pReader->aBuf[iBuf]);
×
838
    }
839
    taosMemoryFree(pReader);
×
840
  }
841

842
  *ppReader = NULL;
×
843
}
×
844

845
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
846
  return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX);
×
847
}
848

849
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
×
850
  int32_t code = 0;
×
851
  int32_t lino;
852
  int64_t offset = pDelIdx->offset;
×
853
  int64_t size = pDelIdx->size;
×
854
  int64_t n;
855

856
  taosArrayClear(aDelData);
×
857

858
  // alloc
859
  code = tRealloc(&pReader->aBuf[0], size);
×
860
  TSDB_CHECK_CODE(code, lino, _exit);
×
861

862
  // read
863
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
864
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
865
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
866
  TSDB_CHECK_CODE(code, lino, _exit);
×
867

868
  // // decode
869
  n = 0;
×
870
  while (n < size) {
×
871
    SDelData delData;
×
872
    n += tGetDelData(pReader->aBuf[0] + n, &delData);
×
873

874
    if (delData.version > maxVer) {
×
875
      continue;
×
876
    }
877
    if (taosArrayPush(aDelData, &delData) == NULL) {
×
878
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
879
    }
880
  }
881

882
  if (n != size) {
×
883
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
884
  }
885

886
_exit:
×
887
  if (code) {
×
888
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
889
  }
890
  return code;
×
891
}
892

893
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
×
894
  int32_t code = 0;
×
895
  int32_t lino;
896
  int32_t n;
897
  int64_t offset = pReader->fDel.offset;
×
898
  int64_t size = pReader->fDel.size - offset;
×
899

900
  taosArrayClear(aDelIdx);
×
901

902
  // alloc
903
  code = tRealloc(&pReader->aBuf[0], size);
×
904
  TSDB_CHECK_CODE(code, lino, _exit);
×
905

906
  // read
907
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
908
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
909
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
910
  TSDB_CHECK_CODE(code, lino, _exit);
×
911

912
  // decode
913
  n = 0;
×
914
  while (n < size) {
×
915
    SDelIdx delIdx;
×
916

917
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
×
918

919
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
×
920
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
921
    }
922
  }
923

924
  if (n != size) {
×
925
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
926
  }
927

928
_exit:
×
929
  if (code) {
×
930
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
931
  }
932
  return code;
×
933
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc