• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3818

01 Apr 2025 07:46AM UTC coverage: 34.065% (-0.02%) from 34.08%
#3818

push

travis-ci

happyguoxy
test:alter gcda dir

148531 of 599532 branches covered (24.77%)

Branch coverage included in aggregate %.

222425 of 489446 relevant lines covered (45.44%)

762721.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.31
/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "tcs.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20
#include "vnd.h"
21

22
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
3,263✔
23
  int32_t     code = 0;
3,263✔
24
  int32_t     lino;
25
  const char *path = pFD->path;
3,263✔
26
  int32_t     szPage = pFD->szPage;
3,263✔
27
  int32_t     flag = pFD->flag;
3,263✔
28
  int64_t     lc_size = 0;
3,263✔
29

30
  pFD->pFD = taosOpenFile(path, flag);
3,263✔
31
  if (pFD->pFD == NULL) {
3,264!
32
    if (tsS3Enabled && pFD->lcn > 1 && !strncmp(path + strlen(path) - 5, ".data", 5)) {
×
33
      char lc_path[TSDB_FILENAME_LEN];
34
      tstrncpy(lc_path, path, TSDB_FQDN_LEN);
×
35

36
      int32_t     vid = 0;
×
37
      const char *object_name = taosDirEntryBaseName((char *)path);
×
38
      (void)sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid);
×
39

40
      char *dot = strrchr(lc_path, '.');
×
41
      if (!dot) {
×
42
        tsdbError("unexpected path: %s", lc_path);
×
43
        TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
×
44
      }
45
      snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", pFD->lcn);
×
46

47
      pFD->pFD = taosOpenFile(lc_path, flag);
×
48
      if (pFD->pFD == NULL) {
×
49
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
50
      }
51
      if (taosStatFile(lc_path, &lc_size, NULL, NULL) < 0) {
×
52
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
53
      }
54
    } else {
55
      tsdbInfo("no file: %s", path);
×
56
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
57
    }
58
    pFD->s3File = 1;
×
59
  }
60

61
  pFD->pBuf = taosMemoryCalloc(1, szPage);
3,264✔
62
  if (pFD->pBuf == NULL) {
3,263!
63
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
64
  }
65

66
  if (lc_size > 0) {
3,263!
67
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
68
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
69

70
    pFD->szFile = lc_size + chunksize * (pFD->lcn - 1);
×
71
  }
72

73
  // not check file size when reading data files.
74
  if (flag != TD_FILE_READ /* && !pFD->s3File*/) {
3,263✔
75
    if (!lc_size && taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
1,706!
76
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
77
    }
78
  }
79

80
  if (pFD->szFile % szPage != 0) {
3,262!
81
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
82
  }
83
  pFD->szFile = pFD->szFile / szPage;
3,262✔
84

85
_exit:
3,262✔
86
  if (code) {
3,262!
87
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
88
  }
89
  return code;
3,264✔
90
}
91

92
// =============== PAGE-WISE FILE ===============
93
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn) {
3,958✔
94
  int32_t  code = 0;
3,958✔
95
  int32_t  lino;
96
  STsdbFD *pFD = NULL;
3,958✔
97
  int32_t  szPage = pTsdb->pVnode->config.tsdbPageSize;
3,958✔
98

99
  *ppFD = NULL;
3,958✔
100

101
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
3,958!
102
  if (pFD == NULL) {
3,962!
103
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
104
  }
105

106
  pFD->path = (char *)&pFD[1];
3,962✔
107
  memcpy(pFD->path, path, strlen(path) + 1);
3,962✔
108
  pFD->szPage = szPage;
3,962✔
109
  pFD->flag = flag;
3,962✔
110
  pFD->szPage = szPage;
3,962✔
111
  pFD->pgno = 0;
3,962✔
112
  pFD->lcn = lcn;
3,962✔
113
  pFD->pTsdb = pTsdb;
3,962✔
114

115
  *ppFD = pFD;
3,962✔
116

117
_exit:
3,962✔
118
  if (code) {
3,962!
119
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
120
  }
121
  return code;
3,962✔
122
}
123

124
void tsdbCloseFile(STsdbFD **ppFD) {
3,959✔
125
  STsdbFD *pFD = *ppFD;
3,959✔
126
  if (pFD) {
3,959!
127
    taosMemoryFree(pFD->pBuf);
3,962!
128
    int32_t code = taosCloseFile(&pFD->pFD);
3,961✔
129
    if (code) {
3,960!
130
      tsdbError("failed to close file: %s, code:%d reason:%s", pFD->path, code, tstrerror(code));
×
131
    } else {
132
      tsdbTrace("close file: %s", pFD->path);
3,960✔
133
    }
134
    taosMemoryFree(pFD);
3,960!
135
    *ppFD = NULL;
3,962✔
136
  }
137
}
3,959✔
138

139
static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
251,653✔
140
  int32_t code = 0;
251,653✔
141
  int32_t lino;
142

143
  if (!pFD->pFD) {
251,653✔
144
    code = tsdbOpenFileImpl(pFD);
1,704✔
145
    TSDB_CHECK_CODE(code, lino, _exit);
1,704!
146
  }
147

148
  if (pFD->pgno > 0) {
251,653✔
149
    int64_t offset = PAGE_OFFSET(pFD->pgno, pFD->szPage);
249,949✔
150
    if (pFD->s3File && pFD->lcn > 1) {
249,949!
151
      SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
152
      int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
153
      int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
154

155
      offset -= chunkoffset;
×
156
    }
157

158
    int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
249,949✔
159
    if (n < 0) {
249,951!
160
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
161
    }
162

163
    code = taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
249,951!
164
    TSDB_CHECK_CODE(code, lino, _exit);
249,952!
165

166
    if (encryptAlgorithm == DND_CA_SM4) {
249,952!
167
      // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
168
      unsigned char PacketData[128];
169
      int           NewLen;
170
      int32_t       count = 0;
×
171
      while (count < pFD->szPage) {
×
172
        SCryptOpts opts = {0};
×
173
        opts.len = 128;
×
174
        opts.source = pFD->pBuf + count;
×
175
        opts.result = PacketData;
×
176
        opts.unitLen = 128;
×
177
        tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
×
178

179
        NewLen = CBC_Encrypt(&opts);
×
180

181
        memcpy(pFD->pBuf + count, PacketData, NewLen);
×
182
        count += NewLen;
×
183
      }
184
      // tsdbDebug("CBC_Encrypt count:%d %s", count, __FUNCTION__);
185
    }
186

187
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
249,952✔
188
    if (n < 0) {
249,951✔
189
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
1!
190
    }
191

192
    if (pFD->szFile < pFD->pgno) {
249,950✔
193
      pFD->szFile = pFD->pgno;
249,372✔
194
    }
195
  }
196
  pFD->pgno = 0;
251,654✔
197

198
_exit:
251,654✔
199
  if (code) {
251,654!
200
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
201
  }
202
  return code;
251,652✔
203
}
204

205
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgorithm, char *encryptKey) {
71,878✔
206
  int32_t code = 0;
71,878✔
207
  int32_t lino;
208

209
  if (!pFD->pFD) {
71,878!
210
    code = tsdbOpenFileImpl(pFD);
×
211
    TSDB_CHECK_CODE(code, lino, _exit);
×
212
  }
213

214
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
71,878✔
215
  if (pFD->lcn > 1) {
71,878!
216
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
217
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
218
    int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
219

220
    offset -= chunkoffset;
×
221
  }
222

223
  // seek
224
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
71,878✔
225
  if (n < 0) {
71,879!
226
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
227
  }
228

229
  // read
230
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
71,879✔
231
  if (n < 0) {
71,879!
232
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
233
  } else if (n < pFD->szPage) {
71,879!
234
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
235
  }
236
  //}
237

238
  if (encryptAlgorithm == DND_CA_SM4) {
71,879!
239
    // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
240
    unsigned char PacketData[128];
241
    int           NewLen;
242

243
    int32_t count = 0;
×
244
    while (count < pFD->szPage) {
×
245
      SCryptOpts opts = {0};
×
246
      opts.len = 128;
×
247
      opts.source = pFD->pBuf + count;
×
248
      opts.result = PacketData;
×
249
      opts.unitLen = 128;
×
250
      tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
×
251

252
      NewLen = CBC_Decrypt(&opts);
×
253

254
      memcpy(pFD->pBuf + count, PacketData, NewLen);
×
255
      count += NewLen;
×
256
    }
257
    // tsdbDebug("CBC_Decrypt count:%d %s", count, __FUNCTION__);
258
  }
259

260
  // check
261
  if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
142,177!
262
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
263
  }
264

265
  pFD->pgno = pgno;
71,879✔
266

267
_exit:
71,879✔
268
  if (code) {
71,879!
269
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
270
  }
271
  return code;
71,879✔
272
}
273

274
int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
29,442✔
275
                      char *encryptKey) {
276
  int32_t code = 0;
29,442✔
277
  int32_t lino;
278
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
29,442✔
279
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
29,442✔
280
  int64_t bOffset = fOffset % pFD->szPage;
29,442✔
281
  int64_t n = 0;
29,442✔
282

283
  do {
284
    if (pFD->pgno != pgno) {
277,684✔
285
      code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
249,949✔
286
      TSDB_CHECK_CODE(code, lino, _exit);
249,948!
287

288
      if (pgno <= pFD->szFile) {
249,948✔
289
        code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
579✔
290
        TSDB_CHECK_CODE(code, lino, _exit);
579!
291
      } else {
292
        pFD->pgno = pgno;
249,369✔
293
      }
294
    }
295

296
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
277,683✔
297
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
277,683✔
298

299
    pgno++;
277,683✔
300
    bOffset = 0;
277,683✔
301
    n += nWrite;
277,683✔
302
  } while (n < size);
277,683✔
303

304
_exit:
29,441✔
305
  if (code) {
29,441!
306
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
307
  }
308
  return code;
29,438✔
309
}
310

311
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
22,436✔
312
                               char *encryptKey) {
313
  int32_t code = 0;
22,436✔
314
  int32_t lino;
315
  int64_t n = 0;
22,436✔
316
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
22,436✔
317
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
22,436✔
318
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
22,436✔
319
  int64_t bOffset = fOffset % pFD->szPage;
22,436✔
320

321
  if (bOffset >= szPgCont) {
22,436!
322
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
323
  }
324

325
  while (n < size) {
109,501✔
326
    if (pFD->pgno != pgno) {
87,065✔
327
      code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
71,299✔
328
      TSDB_CHECK_CODE(code, lino, _exit);
71,299!
329
    }
330

331
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
87,065✔
332
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
87,065✔
333

334
    n += nRead;
87,065✔
335
    pgno++;
87,065✔
336
    bOffset = 0;
87,065✔
337
  }
338

339
_exit:
22,436✔
340
  if (code) {
22,436!
341
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
342
  }
343
  return code;
22,436✔
344
}
345
#ifdef USE_S3
346
static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
×
347
  int32_t    code = 0;
×
348
  int32_t    lino;
349
  SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
350
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
351
  int64_t    cOffset = offset % chunksize;
×
352
  int64_t    n = 0;
×
353
  char      *buf = NULL;
×
354

355
  char   *object_name = taosDirEntryBaseName(pFD->path);
×
356
  char    object_name_prefix[TSDB_FILENAME_LEN];
357
  int32_t node_id = vnodeNodeId(pFD->pTsdb->pVnode);
×
358
  snprintf(object_name_prefix, TSDB_FQDN_LEN, "%d/%s", node_id, object_name);
×
359

360
  char *dot = strrchr(object_name_prefix, '.');
×
361
  if (!dot) {
×
362
    tsdbError("unexpected path: %s", object_name_prefix);
×
363
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
×
364
  }
365

366
  buf = taosMemoryCalloc(1, size);
×
367
  if (buf == NULL) {
×
368
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
369
  }
370

371
  for (int32_t chunkno = offset / chunksize + 1; n < size; ++chunkno) {
×
372
    int64_t nRead = TMIN(chunksize - cOffset, size - n);
×
373

374
    if (chunkno >= pFD->lcn) {
×
375
      // read last chunk
376
      int64_t ret = taosLSeekFile(pFD->pFD, chunksize * (chunkno - pFD->lcn) + cOffset, SEEK_SET);
×
377
      if (ret < 0) {
×
378
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
379
      }
380

381
      ret = taosReadFile(pFD->pFD, buf + n, nRead);
×
382
      if (ret < 0) {
×
383
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
384
      } else if (ret < nRead) {
×
385
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
386
      }
387
    } else {
388
      uint8_t *pBlock = NULL;
×
389

390
      snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", chunkno);
×
391

392
      code = tcsGetObjectBlock(object_name_prefix, cOffset, nRead, check, &pBlock);
×
393
      TSDB_CHECK_CODE(code, lino, _exit);
×
394

395
      memcpy(buf + n, pBlock, nRead);
×
396
      taosMemoryFree(pBlock);
×
397
    }
398

399
    n += nRead;
×
400
    cOffset = 0;
×
401
  }
402

403
_exit:
×
404
  if (code) {
×
405
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
406
    taosMemoryFree(buf);
×
407
  } else {
408
    *ppBlock = (uint8_t *)buf;
×
409
  }
410
  return code;
×
411
}
412
#endif
413
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
×
414
#ifdef USE_S3
415
  int32_t code = 0;
×
416
  int32_t lino;
417
  int64_t n = 0;
×
418
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
×
419
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
×
420
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
×
421
  int64_t bOffset = fOffset % pFD->szPage;
×
422

423
  if (bOffset >= szPgCont) {
×
424
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
425
  }
426

427
  // 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
428
  // 2, fetch pgnoStart ~ pgnoEnd from s3
429
  // 3, store pgs to pcache & last pg to pFD->pBuf
430
  // 4, deliver pgs to [pBuf, pBuf + size)
431

432
  while (n < size) {
×
433
    if (pFD->pgno != pgno) {
×
434
      LRUHandle *handle = NULL;
×
435
      code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle);
×
436
      if (code != TSDB_CODE_SUCCESS) {
×
437
        if (handle) {
×
438
          tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
439
        }
440
        TSDB_CHECK_CODE(code, lino, _exit);
×
441
      }
442

443
      if (!handle) {
×
444
        break;
×
445
      }
446

447
      uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
×
448
      memcpy(pFD->pBuf, pPage, pFD->szPage);
×
449
      tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
450

451
      // check
452
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
453
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
454
      }
455

456
      pFD->pgno = pgno;
×
457
    }
458

459
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
460
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
461

462
    n += nRead;
×
463
    ++pgno;
×
464
    bOffset = 0;
×
465
  }
466

467
  if (n < size) {
×
468
    // 2, retrieve pgs from s3
469
    uint8_t *pBlock = NULL;
×
470
    int64_t  retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
×
471
    int64_t  pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont;
×
472

473
    if (szHint > 0) {
×
474
      pgnoEnd = pgno - 1 + (bOffset + szHint - n + szPgCont - 1) / szPgCont;
×
475
    }
476

477
    int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
×
478

479
    code = tsdbReadFileBlock(pFD, retrieve_offset, retrieve_size, 1, &pBlock);
×
480
    TSDB_CHECK_CODE(code, lino, _exit);
×
481
    // 3, Store Pages in Cache
482
    int nPage = pgnoEnd - pgno + 1;
×
483
    for (int i = 0; i < nPage; ++i) {
×
484
      if (pFD->szFile != pgno) {  // DONOT cache last volatile page
×
485
        tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
×
486
      }
487

488
      if (szHint > 0 && n >= size) {
×
489
        ++pgno;
×
490
        continue;
×
491
      }
492
      memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
×
493

494
      // check
495
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
496
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
497
      }
498

499
      pFD->pgno = pgno;
×
500

501
      int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
502
      memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
503

504
      n += nRead;
×
505
      ++pgno;
×
506
      bOffset = 0;
×
507
    }
508

509
    taosMemoryFree(pBlock);
×
510
  }
511

512
_exit:
×
513
  if (code) {
×
514
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
515
  }
516
  return code;
×
517
#else
518
  return TSDB_CODE_INTERNAL_ERROR;
519
#endif
520
}
521

522
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
22,436✔
523
                     int32_t encryptAlgorithm, char *encryptKey) {
524
  int32_t code = 0;
22,436✔
525
  int32_t lino;
526

527
  if (!pFD->pFD) {
22,436✔
528
    code = tsdbOpenFileImpl(pFD);
1,559✔
529
    TSDB_CHECK_CODE(code, lino, _exit);
1,559!
530
  }
531

532
  if (pFD->s3File && pFD->lcn > 1 /* && tsS3BlockSize < 0*/) {
22,436!
533
    code = tsdbReadFileS3(pFD, offset, pBuf, size, szHint);
×
534
    TSDB_CHECK_CODE(code, lino, _exit);
×
535
  } else {
536
    code = tsdbReadFileImp(pFD, offset, pBuf, size, encryptAlgorithm, encryptKey);
22,436✔
537
    TSDB_CHECK_CODE(code, lino, _exit);
22,436!
538
  }
539

540
_exit:
22,436✔
541
  if (code) {
22,436!
542
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
543
  }
544
  return code;
22,436✔
545
}
546

547
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
19,153✔
548
                             int32_t encryptAlgorithm, char *encryptKey) {
549
  int32_t code;
550
  int32_t lino;
551

552
  code = tBufferEnsureCapacity(buffer, buffer->size + size);
19,153✔
553
  TSDB_CHECK_CODE(code, lino, _exit);
19,154!
554

555
  code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint, encryptAlgorithm, encryptKey);
19,154✔
556
  TSDB_CHECK_CODE(code, lino, _exit);
19,153!
557

558
  buffer->size += size;
19,153✔
559

560
_exit:
19,153✔
561
  if (code) {
19,153!
562
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
563
  }
564
  return code;
19,154✔
565
}
566

567
int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
1,705✔
568
  int32_t code = 0;
1,705✔
569
  int32_t lino;
570

571
  code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
1,705✔
572
  TSDB_CHECK_CODE(code, lino, _exit);
1,705!
573

574
  if (taosFsyncFile(pFD->pFD) < 0) {
1,705!
575
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ERRNO), lino, _exit);
×
576
  }
577

578
_exit:
1,705✔
579
  if (code) {
1,705!
580
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
581
  }
582
  return code;
1,705✔
583
}
584

585
// SDataFReader ====================================================
586
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
×
587
  int32_t       code = 0;
×
588
  int32_t       lino = 0;
×
589
  SDataFReader *pReader = NULL;
×
590
  int32_t       szPage = pTsdb->pVnode->config.tsdbPageSize;
×
591
  char          fname[TSDB_FILENAME_LEN];
592

593
  // alloc
594
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
×
595
  if (pReader == NULL) {
×
596
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
597
  }
598
  pReader->pTsdb = pTsdb;
×
599
  pReader->pSet = pSet;
×
600

601
  // head
602
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
×
603
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD, 0);
×
604
  TSDB_CHECK_CODE(code, lino, _exit);
×
605

606
  // data
607
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
×
608
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD, 0);
×
609
  TSDB_CHECK_CODE(code, lino, _exit);
×
610

611
  // sma
612
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
×
613
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD, 0);
×
614
  TSDB_CHECK_CODE(code, lino, _exit);
×
615

616
  // stt
617
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
×
618
    tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
×
619
    code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt], 0);
×
620
    TSDB_CHECK_CODE(code, lino, _exit);
×
621
  }
622

623
_exit:
×
624
  if (code) {
×
625
    *ppReader = NULL;
×
626
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
627

628
    if (pReader) {
×
629
      for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
×
630
      tsdbCloseFile(&pReader->pSmaFD);
×
631
      tsdbCloseFile(&pReader->pDataFD);
×
632
      tsdbCloseFile(&pReader->pHeadFD);
×
633
      taosMemoryFree(pReader);
×
634
    }
635
  } else {
636
    *ppReader = pReader;
×
637
  }
638
  return code;
×
639
}
640

641
void tsdbDataFReaderClose(SDataFReader **ppReader) {
×
642
  if (*ppReader == NULL) return;
×
643

644
  // head
645
  tsdbCloseFile(&(*ppReader)->pHeadFD);
×
646

647
  // data
648
  tsdbCloseFile(&(*ppReader)->pDataFD);
×
649

650
  // sma
651
  tsdbCloseFile(&(*ppReader)->pSmaFD);
×
652

653
  // stt
654
  for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
×
655
    if ((*ppReader)->aSttFD[iStt]) {
×
656
      tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
×
657
    }
658
  }
659

660
  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
×
661
    tFree((*ppReader)->aBuf[iBuf]);
×
662
  }
663
  taosMemoryFree(*ppReader);
×
664
  *ppReader = NULL;
×
665
}
666

667
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
×
668
  int32_t    code = 0;
×
669
  int32_t    lino;
670
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
×
671
  int64_t    offset = pHeadFile->offset;
×
672
  int64_t    size = pHeadFile->size - offset;
×
673

674
  taosArrayClear(aBlockIdx);
×
675
  if (size == 0) {
×
676
    return code;
×
677
  }
678

679
  // alloc
680
  code = tRealloc(&pReader->aBuf[0], size);
×
681
  TSDB_CHECK_CODE(code, lino, _exit);
×
682

683
  // read
684
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
685
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
686
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
687
  TSDB_CHECK_CODE(code, lino, _exit);
×
688

689
  // decode
690
  int64_t n = 0;
×
691
  while (n < size) {
×
692
    SBlockIdx blockIdx;
693
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
×
694

695
    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
×
696
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
697
    }
698
  }
699
  if (n != size) {
×
700
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
701
  }
702

703
_exit:
×
704
  if (code) {
×
705
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
706
  }
707
  return code;
×
708
}
709

710
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
×
711
  int32_t   code = 0;
×
712
  int32_t   lino;
713
  SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
×
714
  int64_t   offset = pSttFile->offset;
×
715
  int64_t   size = pSttFile->size - offset;
×
716

717
  taosArrayClear(aSttBlk);
×
718
  if (size == 0) {
×
719
    return 0;
×
720
  }
721

722
  // alloc
723
  code = tRealloc(&pReader->aBuf[0], size);
×
724
  TSDB_CHECK_CODE(code, lino, _exit);
×
725

726
  // read
727
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
728
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
729
  code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
730
  TSDB_CHECK_CODE(code, lino, _exit);
×
731

732
  // decode
733
  int64_t n = 0;
×
734
  while (n < size) {
×
735
    SSttBlk sttBlk;
736
    n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
×
737

738
    if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
×
739
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
740
    }
741
  }
742
  if (n != size) {
×
743
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
744
  }
745

746
_exit:
×
747
  if (code) {
×
748
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
749
  }
750
  return code;
×
751
}
752

753
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
×
754
  int32_t code = 0;
×
755
  int32_t lino;
756
  int64_t offset = pBlockIdx->offset;
×
757
  int64_t size = pBlockIdx->size;
×
758

759
  // alloc
760
  code = tRealloc(&pReader->aBuf[0], size);
×
761
  TSDB_CHECK_CODE(code, lino, _exit);
×
762

763
  // read
764
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
765
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
766
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
767
  TSDB_CHECK_CODE(code, lino, _exit);
×
768

769
  // decode
770
  int32_t n;
771
  code = tGetMapData(pReader->aBuf[0], mDataBlk, &n);
×
772
  if (code) goto _exit;
×
773
  if (n != size) {
×
774
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
775
  }
776

777
_exit:
×
778
  if (code) {
×
779
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
780
  }
781
  return code;
×
782
}
783

784
// SDelFReader ====================================================
785
struct SDelFReader {
786
  STsdb   *pTsdb;
787
  SDelFile fDel;
788
  STsdbFD *pReadH;
789
  uint8_t *aBuf[1];
790
};
791

792
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
×
793
  int32_t      code = 0;
×
794
  int32_t      lino = 0;
×
795
  char         fname[TSDB_FILENAME_LEN];
796
  SDelFReader *pDelFReader = NULL;
×
797

798
  // alloc
799
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
×
800
  if (pDelFReader == NULL) {
×
801
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
802
  }
803

804
  // open impl
805
  pDelFReader->pTsdb = pTsdb;
×
806
  pDelFReader->fDel = *pFile;
×
807

808
  tsdbDelFileName(pTsdb, pFile, fname);
×
809
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH, 0);
×
810
  TSDB_CHECK_CODE(code, lino, _exit);
×
811

812
_exit:
×
813
  if (code) {
×
814
    *ppReader = NULL;
×
815
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
816
    taosMemoryFree(pDelFReader);
×
817
  } else {
818
    *ppReader = pDelFReader;
×
819
  }
820
  return code;
×
821
}
822

823
void tsdbDelFReaderClose(SDelFReader **ppReader) {
×
824
  int32_t      code = 0;
×
825
  SDelFReader *pReader = *ppReader;
×
826

827
  if (pReader) {
×
828
    tsdbCloseFile(&pReader->pReadH);
×
829
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
×
830
      tFree(pReader->aBuf[iBuf]);
×
831
    }
832
    taosMemoryFree(pReader);
×
833
  }
834

835
  *ppReader = NULL;
×
836
}
×
837

838
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
839
  return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX);
×
840
}
841

842
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
×
843
  int32_t code = 0;
×
844
  int32_t lino;
845
  int64_t offset = pDelIdx->offset;
×
846
  int64_t size = pDelIdx->size;
×
847
  int64_t n;
848

849
  taosArrayClear(aDelData);
×
850

851
  // alloc
852
  code = tRealloc(&pReader->aBuf[0], size);
×
853
  TSDB_CHECK_CODE(code, lino, _exit);
×
854

855
  // read
856
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
857
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
858
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
859
  TSDB_CHECK_CODE(code, lino, _exit);
×
860

861
  // // decode
862
  n = 0;
×
863
  while (n < size) {
×
864
    SDelData delData;
865
    n += tGetDelData(pReader->aBuf[0] + n, &delData);
×
866

867
    if (delData.version > maxVer) {
×
868
      continue;
×
869
    }
870
    if (taosArrayPush(aDelData, &delData) == NULL) {
×
871
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
872
    }
873
  }
874

875
  if (n != size) {
×
876
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
877
  }
878

879
_exit:
×
880
  if (code) {
×
881
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
882
  }
883
  return code;
×
884
}
885

886
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
×
887
  int32_t code = 0;
×
888
  int32_t lino;
889
  int32_t n;
890
  int64_t offset = pReader->fDel.offset;
×
891
  int64_t size = pReader->fDel.size - offset;
×
892

893
  taosArrayClear(aDelIdx);
×
894

895
  // alloc
896
  code = tRealloc(&pReader->aBuf[0], size);
×
897
  TSDB_CHECK_CODE(code, lino, _exit);
×
898

899
  // read
900
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
901
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
902
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
903
  TSDB_CHECK_CODE(code, lino, _exit);
×
904

905
  // decode
906
  n = 0;
×
907
  while (n < size) {
×
908
    SDelIdx delIdx;
909

910
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
×
911

912
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
×
913
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
914
    }
915
  }
916

917
  if (n != size) {
×
918
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
919
  }
920

921
_exit:
×
922
  if (code) {
×
923
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
924
  }
925
  return code;
×
926
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc