• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4870

26 Nov 2025 05:46AM UTC coverage: 64.545% (+0.006%) from 64.539%
#4870

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

768 of 945 new or added lines in 33 files covered. (81.27%)

2982 existing lines in 119 files now uncovered.

158219 of 245129 relevant lines covered (64.55%)

112474797.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.16
/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "tss.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20
#include "vnd.h"
21

22
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
249,331,193✔
23
  int32_t     code = 0;
249,331,193✔
24
  int32_t     lino;
25
  const char *path = pFD->path;
249,331,193✔
26
  int32_t     szPage = pFD->szPage;
249,364,048✔
27
  int32_t     flag = pFD->flag;
249,375,297✔
28
  int64_t     lc_size = 0;
249,371,983✔
29

30
  pFD->pFD = taosOpenFile(path, flag);
249,386,961✔
31
  if (pFD->pFD == NULL) {
249,392,046✔
NEW
32
    if (TD_FILE_READ == flag) {
×
NEW
33
      int32_t expired = grantCheck(TSDB_GRANT_SHARED_STORAGE);
×
NEW
34
      if (expired && tsSsEnabled) {
×
NEW
35
        tsdbWarn("s3 grant expired: %d", expired);
×
NEW
36
        tsSsEnabled = false;
×
NEW
37
      } else if (!expired && tsSsEnabled) {
×
NEW
38
        tsSsEnabled = true;
×
39
      }
40
    }
41

42
    if (tsSsEnabled && pFD->lcn > 1 && !strncmp(path + strlen(path) - 5, ".data", 5)) {
×
43
      char lc_path[TSDB_FILENAME_LEN];
×
44
      tstrncpy(lc_path, path, TSDB_FQDN_LEN);
×
45

46
      int32_t     vid = 0;
×
47
      const char *object_name = taosDirEntryBaseName((char *)path);
×
48
      (void)sscanf(object_name, "v%df%dver%" PRId64, &vid, &pFD->fid, &pFD->cid);
×
49

UNCOV
50
      char *dot = strrchr(lc_path, '.');
×
UNCOV
51
      if (!dot) {
×
52
        tsdbError("unexpected path: %s", lc_path);
×
53
        TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
×
54
      }
UNCOV
55
      snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", pFD->lcn);
×
56

57
      pFD->pFD = taosOpenFile(lc_path, flag);
×
58
      if (pFD->pFD == NULL) {
×
UNCOV
59
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
60
      }
61
      if (taosStatFile(lc_path, &lc_size, NULL, NULL) < 0) {
×
62
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
63
      }
64
    } else {
65
      tsdbInfo("no file: %s", path);
×
UNCOV
66
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
67
    }
68
    pFD->ssFile = 1;
×
69
  }
70

71
  pFD->pBuf = taosMemoryCalloc(1, szPage);
249,390,687✔
72
  if (pFD->pBuf == NULL) {
249,369,023✔
UNCOV
73
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
74
  }
75

76
  if (lc_size > 0) {
249,376,769✔
UNCOV
77
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
78
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
79

UNCOV
80
    pFD->szFile = lc_size + chunksize * (pFD->lcn - 1);
×
81
  }
82

83
  // not check file size when reading data files.
84
  if (flag != TD_FILE_READ /* && !pFD->s3File*/) {
249,376,769✔
85
    if (!lc_size && taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
18,243,228✔
UNCOV
86
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
87
    }
88
  }
89

90
  if (pFD->szFile % szPage != 0) {
249,365,841✔
UNCOV
91
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
92
  }
93
  pFD->szFile = pFD->szFile / szPage;
249,394,607✔
94

95
_exit:
249,394,968✔
96
  if (code) {
249,428,669✔
UNCOV
97
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
98
  }
99
  return code;
249,428,669✔
100
}
101

102
// =============== PAGE-WISE FILE ===============
103
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn) {
256,705,820✔
104
  int32_t  code = 0;
256,705,820✔
105
  int32_t  lino;
106
  STsdbFD *pFD = NULL;
256,705,820✔
107
  int32_t  szPage = pTsdb->pVnode->config.tsdbPageSize;
256,705,820✔
108

109
  *ppFD = NULL;
256,732,636✔
110

111
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
256,744,611✔
112
  if (pFD == NULL) {
256,571,592✔
UNCOV
113
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
114
  }
115

116
  pFD->path = (char *)&pFD[1];
256,571,592✔
117
  memcpy(pFD->path, path, strlen(path) + 1);
256,579,062✔
118
  pFD->szPage = szPage;
256,742,727✔
119
  pFD->flag = flag;
256,755,564✔
120
  pFD->szPage = szPage;
256,747,045✔
121
  pFD->pgno = 0;
256,712,396✔
122
  pFD->lcn = lcn;
256,740,899✔
123
  pFD->pTsdb = pTsdb;
256,739,056✔
124

125
  *ppFD = pFD;
256,702,764✔
126

127
_exit:
256,730,633✔
128
  if (code) {
256,730,633✔
UNCOV
129
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
130
  }
131
  return code;
256,679,813✔
132
}
133

134
void tsdbCloseFile(STsdbFD **ppFD) {
256,732,466✔
135
  STsdbFD *pFD = *ppFD;
256,732,466✔
136
  if (pFD) {
256,749,125✔
137
    taosMemoryFree(pFD->pBuf);
256,756,322✔
138
    int32_t code = taosCloseFile(&pFD->pFD);
256,740,499✔
139
    if (code) {
256,668,504✔
UNCOV
140
      tsdbError("failed to close file: %s, code:%d reason:%s", pFD->path, code, tstrerror(code));
×
141
    } else {
142
      tsdbTrace("close file: %s", pFD->path);
256,668,504✔
143
    }
144
    taosMemoryFree(pFD);
256,662,118✔
145
    *ppFD = NULL;
256,731,904✔
146
  }
147
}
256,730,226✔
148

149
static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
615,628,401✔
150
  int32_t code = 0;
615,628,401✔
151
  int32_t lino;
152

153
  if (!pFD->pFD) {
615,628,401✔
154
    code = tsdbOpenFileImpl(pFD);
18,239,790✔
155
    TSDB_CHECK_CODE(code, lino, _exit);
18,227,094✔
156
  }
157

158
  if (pFD->pgno > 0) {
615,633,853✔
159
    int64_t offset = PAGE_OFFSET(pFD->pgno, pFD->szPage);
597,410,384✔
160
    if (pFD->ssFile && pFD->lcn > 1) {
597,409,793✔
UNCOV
161
      SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
UNCOV
162
      int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
UNCOV
163
      int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
164

UNCOV
165
      offset -= chunkoffset;
×
166
    }
167

168
    int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
597,413,327✔
169
    if (n < 0) {
597,416,192✔
UNCOV
170
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
171
    }
172

173
    code = taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
597,416,192✔
174
    TSDB_CHECK_CODE(code, lino, _exit);
597,422,476✔
175

176
    if (encryptAlgorithm == DND_CA_SM4) {
597,422,476✔
177
      // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
178
      unsigned char PacketData[128];
1,512✔
179
      int           NewLen;
180
      int32_t       count = 0;
1,512✔
181
      while (count < pFD->szPage) {
49,896✔
182
        SCryptOpts opts = {0};
48,384✔
183
        opts.len = 128;
48,384✔
184
        opts.source = pFD->pBuf + count;
48,384✔
185
        opts.result = PacketData;
48,384✔
186
        opts.unitLen = 128;
48,384✔
187
        tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
48,384✔
188

189
        NewLen = CBC_Encrypt(&opts);
48,384✔
190

191
        memcpy(pFD->pBuf + count, PacketData, NewLen);
48,384✔
192
        count += NewLen;
48,384✔
193
      }
194
      // tsdbDebug("CBC_Encrypt count:%d %s", count, __FUNCTION__);
195
    }
196

197
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
597,422,476✔
198
    if (n < 0) {
597,407,880✔
UNCOV
199
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
200
    }
201

202
    if (pFD->szFile < pFD->pgno) {
597,407,880✔
203
      pFD->szFile = pFD->pgno;
596,927,434✔
204
    }
205
  }
206
  pFD->pgno = 0;
615,631,123✔
207

208
_exit:
615,653,883✔
209
  if (code) {
615,653,883✔
UNCOV
210
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
211
  }
212
  return code;
615,636,024✔
213
}
214

215
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgorithm, char *encryptKey) {
521,919,174✔
216
  int32_t code = 0;
521,919,174✔
217
  int32_t lino;
218

219
  if (!pFD->pFD) {
521,919,174✔
220
    code = tsdbOpenFileImpl(pFD);
×
UNCOV
221
    TSDB_CHECK_CODE(code, lino, _exit);
×
222
  }
223

224
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
521,976,045✔
225
  if (pFD->lcn > 1) {
521,964,324✔
UNCOV
226
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
UNCOV
227
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
UNCOV
228
    int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
229

230
    offset -= chunkoffset;
×
231
  }
232

233
  // seek
234
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
521,961,872✔
235
  if (n < 0) {
521,950,235✔
236
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
237
  }
238

239
  // read
240
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
521,950,235✔
241
  if (n < 0) {
521,940,434✔
242
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
93✔
243
  } else if (n < pFD->szPage) {
521,940,341✔
UNCOV
244
    tsdbError(
×
245
        "vgId:%d %s failed at %s:%d since read file size is less than page size, "
246
        "read size: %" PRId64 ", page size: %d, fname:%s, pgno:%" PRId64,
247
        TD_VID(pFD->pTsdb->pVnode), __func__, __FILE__, __LINE__, n, pFD->szPage, pFD->path, pFD->pgno);
UNCOV
248
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
249
  }
250
  //}
251

252
  if (encryptAlgorithm == DND_CA_SM4) {
521,987,650✔
253
    // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
254
    unsigned char PacketData[128];
×
255
    int           NewLen;
256

UNCOV
257
    int32_t count = 0;
×
258
    while (count < pFD->szPage) {
×
UNCOV
259
      SCryptOpts opts = {0};
×
UNCOV
260
      opts.len = 128;
×
UNCOV
261
      opts.source = pFD->pBuf + count;
×
UNCOV
262
      opts.result = PacketData;
×
UNCOV
263
      opts.unitLen = 128;
×
264
      tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
×
265

UNCOV
266
      NewLen = CBC_Decrypt(&opts);
×
267

268
      memcpy(pFD->pBuf + count, PacketData, NewLen);
×
269
      count += NewLen;
×
270
    }
271
    // tsdbDebug("CBC_Decrypt count:%d %s", count, __FUNCTION__);
272
  }
273

274
  // check
275
  if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
821,991,378✔
276
    tsdbError("vgId:%d %s failed at %s:%d since checksum mismatch, fname:%s, pgno:%" PRId64, TD_VID(pFD->pTsdb->pVnode),
×
277
              __func__, __FILE__, __LINE__, pFD->path, pgno);
278
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
279
  }
280

281
  pFD->pgno = pgno;
522,006,101✔
282

283
_exit:
522,015,246✔
284
  if (code) {
522,015,246✔
UNCOV
285
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
286
  }
287
  return code;
521,937,143✔
288
}
289

290
int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
465,657,750✔
291
                      char *encryptKey) {
292
  int32_t code = 0;
465,657,750✔
293
  int32_t lino;
294
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
465,657,750✔
295
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
465,721,556✔
296
  int64_t bOffset = fOffset % pFD->szPage;
465,727,860✔
297
  int64_t n = 0;
465,726,705✔
298

299
  do {
300
    if (pFD->pgno != pgno) {
1,044,860,179✔
301
      code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
597,399,392✔
302
      TSDB_CHECK_CODE(code, lino, _exit);
597,341,699✔
303

304
      if (pgno <= pFD->szFile) {
597,341,699✔
305
        code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
485,694✔
306
        TSDB_CHECK_CODE(code, lino, _exit);
485,694✔
307
      } else {
308
        pFD->pgno = pgno;
596,909,568✔
309
      }
310
    }
311

312
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
1,044,796,008✔
313
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
1,044,806,074✔
314

315
    pgno++;
1,044,778,877✔
316
    bOffset = 0;
1,044,778,877✔
317
    n += nWrite;
1,044,778,877✔
318
  } while (n < size);
1,044,778,877✔
319

320
_exit:
465,645,403✔
321
  if (code) {
465,645,403✔
UNCOV
322
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
323
  }
324
  return code;
465,618,759✔
325
}
326

327
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
1,537,048,593✔
328
                               char *encryptKey) {
329
  int32_t code = 0;
1,537,048,593✔
330
  int32_t lino;
331
  int64_t n = 0;
1,537,048,593✔
332
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
1,537,048,593✔
333
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
1,537,134,938✔
334
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
1,537,262,982✔
335
  int64_t bOffset = fOffset % pFD->szPage;
1,537,264,610✔
336

337
  if (bOffset >= szPgCont) {
1,537,297,839✔
UNCOV
338
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
339
  }
340

341
  while (n < size) {
2,147,483,647✔
342
    if (pFD->pgno != pgno) {
1,745,087,420✔
343
      code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
521,502,478✔
344
      TSDB_CHECK_CODE(code, lino, _exit);
521,438,907✔
345
    }
346

347
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
1,745,083,168✔
348
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
1,745,083,168✔
349

350
    n += nRead;
1,745,004,246✔
351
    pgno++;
1,745,004,246✔
352
    bOffset = 0;
1,745,004,246✔
353
  }
354

355
_exit:
1,537,214,665✔
356
  if (code) {
1,537,214,665✔
UNCOV
357
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
358
  }
359
  return code;
1,537,092,166✔
360
}
361

362

363
#ifdef USE_SHARED_STORAGE
UNCOV
364
static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, uint8_t **ppBlock) {
×
UNCOV
365
  int32_t code = 0, lino, vid = TD_VID(pFD->pTsdb->pVnode);
×
366

367
  char* buf = taosMemoryCalloc(1, size);
×
UNCOV
368
  if (buf == NULL) {
×
UNCOV
369
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
370
  }
371

UNCOV
372
  SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
UNCOV
373
  int64_t szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
374

375
  for (int64_t n = 0, nRead = 0; n < size; n += nRead, offset += nRead) {
×
UNCOV
376
    int chunk = offset / szChunk + 1;
×
377

378
    if (chunk >= pFD->lcn) {
×
379
      if (taosLSeekFile(pFD->pFD, offset - szChunk * (pFD->lcn - 1), SEEK_SET) < 0) {
×
UNCOV
380
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
381
      }
382

383
      int64_t toRead = size - n;
×
UNCOV
384
      nRead = taosReadFile(pFD->pFD, buf + n, toRead);
×
385
      if (nRead < 0) {
×
386
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
UNCOV
387
      } else if (nRead < toRead) {
×
388
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
389
      }
390
    } else if (tsSsEnabled) {
×
UNCOV
391
      int64_t toRead = TMIN(szChunk - offset % szChunk, size - n);
×
UNCOV
392
      nRead = toRead;
×
393

394
      char path[TSDB_FILENAME_LEN];
×
395
      sprintf(path, "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, pFD->fid, vid, pFD->fid, pFD->cid, chunk);
×
396

397
      code = tssReadFileFromDefault(path, offset % szChunk, buf + n, &nRead);
×
398
      TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
399
      if (nRead < toRead) {
×
400
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
401
      }
402
    } else {
UNCOV
403
      TSDB_CHECK_CODE(code = TSDB_CODE_OPS_NOT_SUPPORT, lino, _exit);
×
404
    }
405
  }
406

407
_exit:
×
408
  if (code) {
×
409
    TSDB_ERROR_LOG(vid, lino, code);
×
410
    taosMemoryFree(buf);
×
411
  } else {
UNCOV
412
    *ppBlock = (uint8_t *)buf;
×
413
  }
UNCOV
414
  return code;
×
415
}
416
#endif
417

418

419

420
static int32_t tsdbReadFileSs(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
×
421
#ifdef USE_SHARED_STORAGE
422
  int32_t code = 0;
×
423
  int32_t lino;
424
  int64_t n = 0;
×
UNCOV
425
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
×
UNCOV
426
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
×
UNCOV
427
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
×
UNCOV
428
  int64_t bOffset = fOffset % pFD->szPage;
×
429

430
  if (bOffset >= szPgCont) {
×
UNCOV
431
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
432
  }
433

434
  // 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
435
  // 2, fetch pgnoStart ~ pgnoEnd from s3
436
  // 3, store pgs to pcache & last pg to pFD->pBuf
437
  // 4, deliver pgs to [pBuf, pBuf + size)
438

UNCOV
439
  while (n < size) {
×
440
    if (pFD->pgno != pgno) {
×
441
      LRUHandle *handle = NULL;
×
UNCOV
442
      code = tsdbCacheGetPageSs(pFD->pTsdb->pgCache, pFD, pgno, &handle);
×
UNCOV
443
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
444
        if (handle) {
×
UNCOV
445
          tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
446
        }
UNCOV
447
        TSDB_CHECK_CODE(code, lino, _exit);
×
448
      }
449

450
      if (!handle) {
×
451
        break;
×
452
      }
453

454
      uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
×
455
      memcpy(pFD->pBuf, pPage, pFD->szPage);
×
UNCOV
456
      tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
457

458
      // check
UNCOV
459
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
460
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
461
      }
462

UNCOV
463
      pFD->pgno = pgno;
×
464
    }
465

466
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
UNCOV
467
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
468

469
    n += nRead;
×
470
    ++pgno;
×
UNCOV
471
    bOffset = 0;
×
472
  }
473

UNCOV
474
  if (n < size) {
×
475
    // 2, retrieve pgs from s3
476
    uint8_t *pBlock = NULL;
×
477
    int64_t  retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
×
UNCOV
478
    int64_t  pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont;
×
479

480
    if (szHint > 0) {
×
481
      pgnoEnd = pgno - 1 + (bOffset + szHint - n + szPgCont - 1) / szPgCont;
×
482
    }
483

484
    int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
×
485

486
    code = tsdbReadFileBlock(pFD, retrieve_offset, retrieve_size, &pBlock);
×
487
    TSDB_CHECK_CODE(code, lino, _exit);
×
488
    // 3, Store Pages in Cache
UNCOV
489
    int nPage = pgnoEnd - pgno + 1;
×
490
    for (int i = 0; i < nPage; ++i) {
×
491
      if (pFD->szFile != pgno) {  // DONOT cache last volatile page
×
UNCOV
492
        tsdbCacheSetPageSs(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
×
493
      }
494

UNCOV
495
      if (szHint > 0 && n >= size) {
×
496
        ++pgno;
×
497
        continue;
×
498
      }
499
      memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
×
500

501
      // check
502
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
UNCOV
503
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
504
      }
505

506
      pFD->pgno = pgno;
×
507

UNCOV
508
      int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
509
      memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
510

UNCOV
511
      n += nRead;
×
512
      ++pgno;
×
513
      bOffset = 0;
×
514
    }
515

516
    taosMemoryFree(pBlock);
×
517
  }
518

519
_exit:
×
UNCOV
520
  if (code) {
×
521
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
522
  }
523
  return code;
×
524
#else
525
  return TSDB_CODE_INTERNAL_ERROR;
526
#endif
527
}
528

529
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
1,536,969,490✔
530
                     int32_t encryptAlgorithm, char *encryptKey) {
531
  int32_t code = 0;
1,536,969,490✔
532
  int32_t lino;
533

534
  if (!pFD->pFD) {
1,536,969,490✔
535
    code = tsdbOpenFileImpl(pFD);
231,186,297✔
536
    TSDB_CHECK_CODE(code, lino, _exit);
231,148,976✔
537
  }
538

539
  if (pFD->ssFile && pFD->lcn > 1 /* && tsSsBlockSize < 0*/) {
1,537,059,919✔
UNCOV
540
    code = tsdbReadFileSs(pFD, offset, pBuf, size, szHint);
×
UNCOV
541
    TSDB_CHECK_CODE(code, lino, _exit);
×
542
  } else {
543
    code = tsdbReadFileImp(pFD, offset, pBuf, size, encryptAlgorithm, encryptKey);
1,537,086,038✔
544
    TSDB_CHECK_CODE(code, lino, _exit);
1,537,087,799✔
545
  }
546

547
_exit:
1,537,087,799✔
548
  if (code) {
1,537,087,799✔
UNCOV
549
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
550
  }
551
  return code;
1,537,078,402✔
552
}
553

554
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
873,330,224✔
555
                             int32_t encryptAlgorithm, char *encryptKey) {
556
  int32_t code;
557
  int32_t lino;
558

559
  code = tBufferEnsureCapacity(buffer, buffer->size + size);
873,330,224✔
560
  TSDB_CHECK_CODE(code, lino, _exit);
873,452,765✔
561

562
  code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint, encryptAlgorithm, encryptKey);
873,452,765✔
563
  TSDB_CHECK_CODE(code, lino, _exit);
873,598,948✔
564

565
  buffer->size += size;
873,598,948✔
566

567
_exit:
873,632,152✔
568
  if (code) {
873,632,152✔
UNCOV
569
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
570
  }
571
  return code;
873,558,145✔
572
}
573

574
int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
18,234,473✔
575
  int32_t code = 0;
18,234,473✔
576
  int32_t lino;
577

578
  code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
18,234,473✔
579
  TSDB_CHECK_CODE(code, lino, _exit);
18,243,150✔
580

581
  if (taosFsyncFile(pFD->pFD) < 0) {
18,243,150✔
UNCOV
582
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ERRNO), lino, _exit);
×
583
  }
584

585
_exit:
18,245,718✔
586
  if (code) {
18,245,718✔
UNCOV
587
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
588
  }
589
  return code;
18,246,253✔
590
}
591

592
// SDataFReader ====================================================
UNCOV
593
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
×
UNCOV
594
  int32_t       code = 0;
×
UNCOV
595
  int32_t       lino = 0;
×
UNCOV
596
  SDataFReader *pReader = NULL;
×
597
  int32_t       szPage = pTsdb->pVnode->config.tsdbPageSize;
×
UNCOV
598
  char          fname[TSDB_FILENAME_LEN];
×
599

600
  // alloc
UNCOV
601
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
×
UNCOV
602
  if (pReader == NULL) {
×
603
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
604
  }
605
  pReader->pTsdb = pTsdb;
×
606
  pReader->pSet = pSet;
×
607

608
  // head
UNCOV
609
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
×
UNCOV
610
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD, 0);
×
611
  TSDB_CHECK_CODE(code, lino, _exit);
×
612

613
  // data
UNCOV
614
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
×
615
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD, 0);
×
616
  TSDB_CHECK_CODE(code, lino, _exit);
×
617

618
  // sma
619
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
×
620
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD, 0);
×
621
  TSDB_CHECK_CODE(code, lino, _exit);
×
622

623
  // stt
624
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
×
625
    tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
×
626
    code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt], 0);
×
UNCOV
627
    TSDB_CHECK_CODE(code, lino, _exit);
×
628
  }
629

630
_exit:
×
631
  if (code) {
×
UNCOV
632
    *ppReader = NULL;
×
UNCOV
633
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
634

635
    if (pReader) {
×
636
      for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
×
637
      tsdbCloseFile(&pReader->pSmaFD);
×
UNCOV
638
      tsdbCloseFile(&pReader->pDataFD);
×
UNCOV
639
      tsdbCloseFile(&pReader->pHeadFD);
×
640
      taosMemoryFree(pReader);
×
641
    }
642
  } else {
643
    *ppReader = pReader;
×
644
  }
645
  return code;
×
646
}
647

648
void tsdbDataFReaderClose(SDataFReader **ppReader) {
×
649
  if (*ppReader == NULL) return;
×
650

651
  // head
UNCOV
652
  tsdbCloseFile(&(*ppReader)->pHeadFD);
×
653

654
  // data
655
  tsdbCloseFile(&(*ppReader)->pDataFD);
×
656

657
  // sma
658
  tsdbCloseFile(&(*ppReader)->pSmaFD);
×
659

660
  // stt
UNCOV
661
  for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
×
662
    if ((*ppReader)->aSttFD[iStt]) {
×
UNCOV
663
      tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
×
664
    }
665
  }
666

UNCOV
667
  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
×
668
    tFree((*ppReader)->aBuf[iBuf]);
×
669
  }
UNCOV
670
  taosMemoryFree(*ppReader);
×
671
  *ppReader = NULL;
×
672
}
673

UNCOV
674
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
×
UNCOV
675
  int32_t    code = 0;
×
676
  int32_t    lino;
677
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
×
678
  int64_t    offset = pHeadFile->offset;
×
UNCOV
679
  int64_t    size = pHeadFile->size - offset;
×
680

681
  taosArrayClear(aBlockIdx);
×
UNCOV
682
  if (size == 0) {
×
UNCOV
683
    return code;
×
684
  }
685

686
  // alloc
687
  code = tRealloc(&pReader->aBuf[0], size);
×
688
  TSDB_CHECK_CODE(code, lino, _exit);
×
689

690
  // read
691
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
692
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
693
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
UNCOV
694
  TSDB_CHECK_CODE(code, lino, _exit);
×
695

696
  // decode
697
  int64_t n = 0;
×
698
  while (n < size) {
×
UNCOV
699
    SBlockIdx blockIdx;
×
UNCOV
700
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
×
701

702
    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
×
703
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
704
    }
705
  }
UNCOV
706
  if (n != size) {
×
707
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
708
  }
709

710
_exit:
×
UNCOV
711
  if (code) {
×
712
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
713
  }
UNCOV
714
  return code;
×
715
}
716

717
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
×
UNCOV
718
  int32_t   code = 0;
×
719
  int32_t   lino;
720
  SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
×
721
  int64_t   offset = pSttFile->offset;
×
722
  int64_t   size = pSttFile->size - offset;
×
723

724
  taosArrayClear(aSttBlk);
×
UNCOV
725
  if (size == 0) {
×
UNCOV
726
    return 0;
×
727
  }
728

729
  // alloc
730
  code = tRealloc(&pReader->aBuf[0], size);
×
731
  TSDB_CHECK_CODE(code, lino, _exit);
×
732

733
  // read
734
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
735
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
736
  code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
UNCOV
737
  TSDB_CHECK_CODE(code, lino, _exit);
×
738

739
  // decode
740
  int64_t n = 0;
×
741
  while (n < size) {
×
UNCOV
742
    SSttBlk sttBlk;
×
UNCOV
743
    n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
×
744

745
    if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
×
746
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
747
    }
748
  }
UNCOV
749
  if (n != size) {
×
750
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
751
  }
752

753
_exit:
×
UNCOV
754
  if (code) {
×
755
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
756
  }
UNCOV
757
  return code;
×
758
}
759

760
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
×
UNCOV
761
  int32_t code = 0;
×
762
  int32_t lino;
763
  int64_t offset = pBlockIdx->offset;
×
764
  int64_t size = pBlockIdx->size;
×
765

766
  // alloc
767
  code = tRealloc(&pReader->aBuf[0], size);
×
UNCOV
768
  TSDB_CHECK_CODE(code, lino, _exit);
×
769

770
  // read
771
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
UNCOV
772
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
773
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
774
  TSDB_CHECK_CODE(code, lino, _exit);
×
775

776
  // decode
777
  int32_t n;
×
778
  code = tGetMapData(pReader->aBuf[0], mDataBlk, &n);
×
UNCOV
779
  if (code) goto _exit;
×
UNCOV
780
  if (n != size) {
×
781
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
782
  }
783

784
_exit:
×
UNCOV
785
  if (code) {
×
UNCOV
786
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
787
  }
788
  return code;
×
789
}
790

791
// SDelFReader ====================================================
792
struct SDelFReader {
793
  STsdb   *pTsdb;
794
  SDelFile fDel;
795
  STsdbFD *pReadH;
796
  uint8_t *aBuf[1];
797
};
798

UNCOV
799
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
×
UNCOV
800
  int32_t      code = 0;
×
UNCOV
801
  int32_t      lino = 0;
×
UNCOV
802
  char         fname[TSDB_FILENAME_LEN];
×
UNCOV
803
  SDelFReader *pDelFReader = NULL;
×
804

805
  // alloc
UNCOV
806
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
×
UNCOV
807
  if (pDelFReader == NULL) {
×
UNCOV
808
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
809
  }
810

811
  // open impl
812
  pDelFReader->pTsdb = pTsdb;
×
813
  pDelFReader->fDel = *pFile;
×
814

UNCOV
815
  tsdbDelFileName(pTsdb, pFile, fname);
×
816
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH, 0);
×
817
  TSDB_CHECK_CODE(code, lino, _exit);
×
818

UNCOV
819
_exit:
×
UNCOV
820
  if (code) {
×
UNCOV
821
    *ppReader = NULL;
×
822
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
823
    taosMemoryFree(pDelFReader);
×
824
  } else {
825
    *ppReader = pDelFReader;
×
826
  }
827
  return code;
×
828
}
829

830
void tsdbDelFReaderClose(SDelFReader **ppReader) {
×
831
  int32_t      code = 0;
×
832
  SDelFReader *pReader = *ppReader;
×
833

UNCOV
834
  if (pReader) {
×
835
    tsdbCloseFile(&pReader->pReadH);
×
UNCOV
836
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
×
837
      tFree(pReader->aBuf[iBuf]);
×
838
    }
UNCOV
839
    taosMemoryFree(pReader);
×
840
  }
841

842
  *ppReader = NULL;
×
UNCOV
843
}
×
844

845
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
846
  return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX);
×
847
}
848

849
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
×
UNCOV
850
  int32_t code = 0;
×
851
  int32_t lino;
852
  int64_t offset = pDelIdx->offset;
×
853
  int64_t size = pDelIdx->size;
×
854
  int64_t n;
855

856
  taosArrayClear(aDelData);
×
857

858
  // alloc
859
  code = tRealloc(&pReader->aBuf[0], size);
×
860
  TSDB_CHECK_CODE(code, lino, _exit);
×
861

862
  // read
863
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
UNCOV
864
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
UNCOV
865
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
866
  TSDB_CHECK_CODE(code, lino, _exit);
×
867

868
  // // decode
869
  n = 0;
×
870
  while (n < size) {
×
UNCOV
871
    SDelData delData;
×
UNCOV
872
    n += tGetDelData(pReader->aBuf[0] + n, &delData);
×
873

874
    if (delData.version > maxVer) {
×
875
      continue;
×
876
    }
UNCOV
877
    if (taosArrayPush(aDelData, &delData) == NULL) {
×
UNCOV
878
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
879
    }
880
  }
881

882
  if (n != size) {
×
UNCOV
883
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
884
  }
885

UNCOV
886
_exit:
×
887
  if (code) {
×
888
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
889
  }
UNCOV
890
  return code;
×
891
}
892

893
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
×
UNCOV
894
  int32_t code = 0;
×
895
  int32_t lino;
896
  int32_t n;
897
  int64_t offset = pReader->fDel.offset;
×
898
  int64_t size = pReader->fDel.size - offset;
×
899

900
  taosArrayClear(aDelIdx);
×
901

902
  // alloc
903
  code = tRealloc(&pReader->aBuf[0], size);
×
904
  TSDB_CHECK_CODE(code, lino, _exit);
×
905

906
  // read
907
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
908
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
UNCOV
909
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
910
  TSDB_CHECK_CODE(code, lino, _exit);
×
911

912
  // decode
913
  n = 0;
×
914
  while (n < size) {
×
UNCOV
915
    SDelIdx delIdx;
×
916

917
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
×
918

919
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
×
920
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
921
    }
922
  }
923

924
  if (n != size) {
×
925
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
926
  }
927

UNCOV
928
_exit:
×
929
  if (code) {
×
930
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
931
  }
UNCOV
932
  return code;
×
933
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc