• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3542

27 Nov 2024 02:52AM UTC coverage: 60.819% (+0.04%) from 60.776%
#3542

push

travis-ci

web-flow
Merge pull request #28931 from taosdata/enh/jdbc-demo-3.0

update jdbc demo, and version history

120305 of 252779 branches covered (47.59%)

Branch coverage included in aggregate %.

201010 of 275538 relevant lines covered (72.95%)

19989893.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.59
/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "tcs.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20
#include "vnd.h"
21

22
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
7,129,669✔
23
  int32_t     code = 0;
7,129,669✔
24
  int32_t     lino;
25
  const char *path = pFD->path;
7,129,669✔
26
  int32_t     szPage = pFD->szPage;
7,129,669✔
27
  int32_t     flag = pFD->flag;
7,129,669✔
28
  int64_t     lc_size = 0;
7,129,669✔
29

30
  pFD->pFD = taosOpenFile(path, flag);
7,129,669✔
31
  if (pFD->pFD == NULL) {
7,131,816!
32
    if (tsS3Enabled && pFD->lcn > 1 && !strncmp(path + strlen(path) - 5, ".data", 5)) {
×
33
      char lc_path[TSDB_FILENAME_LEN];
34
      tstrncpy(lc_path, path, TSDB_FQDN_LEN);
×
35

36
      int32_t     vid = 0;
×
37
      const char *object_name = taosDirEntryBaseName((char *)path);
×
38
      (void)sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid);
×
39

40
      char *dot = strrchr(lc_path, '.');
×
41
      if (!dot) {
×
42
        tsdbError("unexpected path: %s", lc_path);
×
43
        TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
×
44
      }
45
      snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", pFD->lcn);
×
46

47
      pFD->pFD = taosOpenFile(lc_path, flag);
×
48
      if (pFD->pFD == NULL) {
×
49
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
50
      }
51
      if (taosStatFile(lc_path, &lc_size, NULL, NULL) < 0) {
×
52
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
53
      }
54
    } else {
55
      tsdbInfo("no file: %s", path);
×
56
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
57
    }
58
    pFD->s3File = 1;
×
59
  }
60

61
  pFD->pBuf = taosMemoryCalloc(1, szPage);
7,131,816✔
62
  if (pFD->pBuf == NULL) {
7,133,970!
63
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
64
  }
65

66
  if (lc_size > 0) {
7,133,970!
67
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
68
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
69

70
    pFD->szFile = lc_size + chunksize * (pFD->lcn - 1);
×
71
  }
72

73
  // not check file size when reading data files.
74
  if (flag != TD_FILE_READ /* && !pFD->s3File*/) {
7,133,970✔
75
    if (!lc_size && taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
166,616!
76
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
77
    }
78
  }
79

80
  if (pFD->szFile % szPage != 0) {
7,135,539!
81
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
82
  }
83
  pFD->szFile = pFD->szFile / szPage;
7,135,539✔
84

85
_exit:
7,135,539✔
86
  if (code) {
7,135,539!
87
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
88
  }
89
  return code;
7,136,797✔
90
}
91

92
// =============== PAGE-WISE FILE ===============
93
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn) {
7,320,550✔
94
  int32_t  code = 0;
7,320,550✔
95
  int32_t  lino;
96
  STsdbFD *pFD = NULL;
7,320,550✔
97
  int32_t  szPage = pTsdb->pVnode->config.tsdbPageSize;
7,320,550✔
98

99
  *ppFD = NULL;
7,320,550✔
100

101
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
7,320,550✔
102
  if (pFD == NULL) {
7,319,364!
103
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
104
  }
105

106
  pFD->path = (char *)&pFD[1];
7,319,515✔
107
  strcpy(pFD->path, path);
7,319,515✔
108
  pFD->szPage = szPage;
7,319,515✔
109
  pFD->flag = flag;
7,319,515✔
110
  pFD->szPage = szPage;
7,319,515✔
111
  pFD->pgno = 0;
7,319,515✔
112
  pFD->lcn = lcn;
7,319,515✔
113
  pFD->pTsdb = pTsdb;
7,319,515✔
114

115
  *ppFD = pFD;
7,319,515✔
116

117
_exit:
7,319,515✔
118
  if (code) {
7,319,515!
119
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
120
  }
121
  return code;
7,318,027✔
122
}
123

124
void tsdbCloseFile(STsdbFD **ppFD) {
7,324,027✔
125
  STsdbFD *pFD = *ppFD;
7,324,027✔
126
  if (pFD) {
7,324,027!
127
    taosMemoryFree(pFD->pBuf);
7,324,802✔
128
    int32_t code = taosCloseFile(&pFD->pFD);
7,326,412✔
129
    if (code) {
7,326,566!
130
      tsdbError("failed to close file: %s, code:%d reason:%s", pFD->path, code, tstrerror(code));
×
131
    } else {
132
      tsdbTrace("close file: %s", pFD->path);
7,326,566✔
133
    }
134
    taosMemoryFree(pFD);
7,326,566✔
135
    *ppFD = NULL;
7,327,181✔
136
  }
137
}
7,326,406✔
138

139
static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
4,942,893✔
140
  int32_t code = 0;
4,942,893✔
141
  int32_t lino;
142

143
  if (!pFD->pFD) {
4,942,893✔
144
    code = tsdbOpenFileImpl(pFD);
166,617✔
145
    TSDB_CHECK_CODE(code, lino, _exit);
166,577!
146
  }
147

148
  if (pFD->pgno > 0) {
4,942,853✔
149
    int64_t offset = PAGE_OFFSET(pFD->pgno, pFD->szPage);
4,776,327✔
150
    if (pFD->s3File && pFD->lcn > 1) {
4,776,327!
151
      SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
152
      int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
153
      int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
154

155
      offset -= chunkoffset;
×
156
    }
157

158
    int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
4,776,327✔
159
    if (n < 0) {
4,776,410!
160
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
161
    }
162

163
    code = taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
4,776,417!
164
    TSDB_CHECK_CODE(code, lino, _exit);
4,776,406!
165

166
    if (encryptAlgorithm == DND_CA_SM4) {
4,776,406!
167
      // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
168
      unsigned char PacketData[128];
169
      int           NewLen;
170
      int32_t       count = 0;
×
171
      while (count < pFD->szPage) {
×
172
        SCryptOpts opts = {0};
×
173
        opts.len = 128;
×
174
        opts.source = pFD->pBuf + count;
×
175
        opts.result = PacketData;
×
176
        opts.unitLen = 128;
×
177
        // strncpy(opts.key, tsEncryptKey, 16);
178
        strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
×
179

180
        NewLen = CBC_Encrypt(&opts);
×
181

182
        memcpy(pFD->pBuf + count, PacketData, NewLen);
×
183
        count += NewLen;
×
184
      }
185
      // tsdbDebug("CBC_Encrypt count:%d %s", count, __FUNCTION__);
186
    }
187

188
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
4,776,406✔
189
    if (n < 0) {
4,776,397!
190
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
191
    }
192

193
    if (pFD->szFile < pFD->pgno) {
4,776,435✔
194
      pFD->szFile = pFD->pgno;
4,774,100✔
195
    }
196
  }
197
  pFD->pgno = 0;
4,942,961✔
198

199
_exit:
4,942,961✔
200
  if (code) {
4,942,961!
201
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
202
  }
203
  return code;
4,942,985✔
204
}
205

206
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgorithm, char *encryptKey) {
24,956,770✔
207
  int32_t code = 0;
24,956,770✔
208
  int32_t lino;
209

210
  if (!pFD->pFD) {
24,956,770!
211
    code = tsdbOpenFileImpl(pFD);
×
212
    TSDB_CHECK_CODE(code, lino, _exit);
×
213
  }
214

215
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
24,956,770✔
216
  if (pFD->lcn > 1) {
24,956,770!
217
    SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
218
    int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
219
    int64_t    chunkoffset = chunksize * (pFD->lcn - 1);
×
220

221
    offset -= chunkoffset;
×
222
  }
223

224
  // seek
225
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
24,956,770✔
226
  if (n < 0) {
24,963,653!
227
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
228
  }
229

230
  // read
231
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
24,963,653✔
232
  if (n < 0) {
24,965,258✔
233
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
6,526!
234
  } else if (n < pFD->szPage) {
24,958,732!
235
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
236
  }
237
  //}
238

239
  if (encryptAlgorithm == DND_CA_SM4) {
24,958,732!
240
    // if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
241
    unsigned char PacketData[128];
242
    int           NewLen;
243

244
    int32_t count = 0;
×
245
    while (count < pFD->szPage) {
×
246
      SCryptOpts opts = {0};
×
247
      opts.len = 128;
×
248
      opts.source = pFD->pBuf + count;
×
249
      opts.result = PacketData;
×
250
      opts.unitLen = 128;
×
251
      // strncpy(opts.key, tsEncryptKey, 16);
252
      strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
×
253

254
      NewLen = CBC_Decrypt(&opts);
×
255

256
      memcpy(pFD->pBuf + count, PacketData, NewLen);
×
257
      count += NewLen;
×
258
    }
259
    // tsdbDebug("CBC_Decrypt count:%d %s", count, __FUNCTION__);
260
  }
261

262
  // check
263
  if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
46,015,786!
264
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
265
  }
266

267
  pFD->pgno = pgno;
24,964,747✔
268

269
_exit:
24,964,747✔
270
  if (code) {
24,964,747!
271
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
272
  }
273
  return code;
24,963,729✔
274
}
275

276
int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
5,440,409✔
277
                      char *encryptKey) {
278
  int32_t code = 0;
5,440,409✔
279
  int32_t lino;
280
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
5,440,409✔
281
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
5,440,409✔
282
  int64_t bOffset = fOffset % pFD->szPage;
5,440,409✔
283
  int64_t n = 0;
5,440,409✔
284

285
  do {
286
    if (pFD->pgno != pgno) {
10,050,111✔
287
      code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
4,776,371✔
288
      TSDB_CHECK_CODE(code, lino, _exit);
4,776,315!
289

290
      if (pgno <= pFD->szFile) {
4,776,315✔
291
        code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
2,288✔
292
        TSDB_CHECK_CODE(code, lino, _exit);
2,288!
293
      } else {
294
        pFD->pgno = pgno;
4,774,027✔
295
      }
296
    }
297

298
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
10,050,055✔
299
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
10,050,055✔
300

301
    pgno++;
10,050,055✔
302
    bOffset = 0;
10,050,055✔
303
    n += nWrite;
10,050,055✔
304
  } while (n < size);
10,050,055✔
305

306
_exit:
5,440,353✔
307
  if (code) {
5,440,353!
308
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
309
  }
310
  return code;
5,440,175✔
311
}
312

313
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
60,107,136✔
314
                               char *encryptKey) {
315
  int32_t code = 0;
60,107,136✔
316
  int32_t lino;
317
  int64_t n = 0;
60,107,136✔
318
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
60,107,136✔
319
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
60,107,136✔
320
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
60,107,136✔
321
  int64_t bOffset = fOffset % pFD->szPage;
60,107,136✔
322

323
  if (bOffset >= szPgCont) {
60,107,136!
324
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
325
  }
326

327
  while (n < size) {
130,220,184✔
328
    if (pFD->pgno != pgno) {
70,110,527✔
329
      code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
24,959,291✔
330
      TSDB_CHECK_CODE(code, lino, _exit);
24,961,812!
331
    }
332

333
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
70,113,048✔
334
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
70,113,048✔
335

336
    n += nRead;
70,113,048✔
337
    pgno++;
70,113,048✔
338
    bOffset = 0;
70,113,048✔
339
  }
340

341
_exit:
60,109,657✔
342
  if (code) {
60,109,657!
343
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
344
  }
345
  return code;
60,093,437✔
346
}
347

348
static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
×
349
  int32_t    code = 0;
×
350
  int32_t    lino;
351
  SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
×
352
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
353
  int64_t    cOffset = offset % chunksize;
×
354
  int64_t    n = 0;
×
355
  char      *buf = NULL;
×
356

357
  char   *object_name = taosDirEntryBaseName(pFD->path);
×
358
  char    object_name_prefix[TSDB_FILENAME_LEN];
359
  int32_t node_id = vnodeNodeId(pFD->pTsdb->pVnode);
×
360
  snprintf(object_name_prefix, TSDB_FQDN_LEN, "%d/%s", node_id, object_name);
×
361

362
  char *dot = strrchr(object_name_prefix, '.');
×
363
  if (!dot) {
×
364
    tsdbError("unexpected path: %s", object_name_prefix);
×
365
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
×
366
  }
367

368
  buf = taosMemoryCalloc(1, size);
×
369
  if (buf == NULL) {
×
370
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
371
  }
372

373
  for (int32_t chunkno = offset / chunksize + 1; n < size; ++chunkno) {
×
374
    int64_t nRead = TMIN(chunksize - cOffset, size - n);
×
375

376
    if (chunkno >= pFD->lcn) {
×
377
      // read last chunk
378
      int64_t ret = taosLSeekFile(pFD->pFD, chunksize * (chunkno - pFD->lcn) + cOffset, SEEK_SET);
×
379
      if (ret < 0) {
×
380
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
381
      }
382

383
      ret = taosReadFile(pFD->pFD, buf + n, nRead);
×
384
      if (ret < 0) {
×
385
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
386
      } else if (ret < nRead) {
×
387
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
388
      }
389
    } else {
390
      uint8_t *pBlock = NULL;
×
391

392
      snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", chunkno);
×
393

394
      code = tcsGetObjectBlock(object_name_prefix, cOffset, nRead, check, &pBlock);
×
395
      TSDB_CHECK_CODE(code, lino, _exit);
×
396

397
      memcpy(buf + n, pBlock, nRead);
×
398
      taosMemoryFree(pBlock);
×
399
    }
400

401
    n += nRead;
×
402
    cOffset = 0;
×
403
  }
404

405
_exit:
×
406
  if (code) {
×
407
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
408
    taosMemoryFree(buf);
×
409
  } else {
410
    *ppBlock = (uint8_t *)buf;
×
411
  }
412
  return code;
×
413
}
414

415
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
×
416
  int32_t code = 0;
×
417
  int32_t lino;
418
  int64_t n = 0;
×
419
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
×
420
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
×
421
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
×
422
  int64_t bOffset = fOffset % pFD->szPage;
×
423

424
  if (bOffset >= szPgCont) {
×
425
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
426
  }
427

428
  // 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
429
  // 2, fetch pgnoStart ~ pgnoEnd from s3
430
  // 3, store pgs to pcache & last pg to pFD->pBuf
431
  // 4, deliver pgs to [pBuf, pBuf + size)
432

433
  while (n < size) {
×
434
    if (pFD->pgno != pgno) {
×
435
      LRUHandle *handle = NULL;
×
436
      code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle);
×
437
      if (code != TSDB_CODE_SUCCESS) {
×
438
        if (handle) {
×
439
          tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
440
        }
441
        TSDB_CHECK_CODE(code, lino, _exit);
×
442
      }
443

444
      if (!handle) {
×
445
        break;
×
446
      }
447

448
      uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
×
449
      memcpy(pFD->pBuf, pPage, pFD->szPage);
×
450
      tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
451

452
      // check
453
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
454
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
455
      }
456

457
      pFD->pgno = pgno;
×
458
    }
459

460
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
461
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
462

463
    n += nRead;
×
464
    ++pgno;
×
465
    bOffset = 0;
×
466
  }
467

468
  if (n < size) {
×
469
    // 2, retrieve pgs from s3
470
    uint8_t *pBlock = NULL;
×
471
    int64_t  retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
×
472
    int64_t  pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont;
×
473

474
    if (szHint > 0) {
×
475
      pgnoEnd = pgno - 1 + (bOffset + szHint - n + szPgCont - 1) / szPgCont;
×
476
    }
477

478
    int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
×
479

480
    code = tsdbReadFileBlock(pFD, retrieve_offset, retrieve_size, 1, &pBlock);
×
481
    TSDB_CHECK_CODE(code, lino, _exit);
×
482
    // 3, Store Pages in Cache
483
    int nPage = pgnoEnd - pgno + 1;
×
484
    for (int i = 0; i < nPage; ++i) {
×
485
      if (pFD->szFile != pgno) {  // DONOT cache last volatile page
×
486
        tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
×
487
      }
488

489
      if (szHint > 0 && n >= size) {
×
490
        ++pgno;
×
491
        continue;
×
492
      }
493
      memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
×
494

495
      // check
496
      if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
×
497
        TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
498
      }
499

500
      pFD->pgno = pgno;
×
501

502
      int64_t nRead = TMIN(szPgCont - bOffset, size - n);
×
503
      memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
×
504

505
      n += nRead;
×
506
      ++pgno;
×
507
      bOffset = 0;
×
508
    }
509

510
    taosMemoryFree(pBlock);
×
511
  }
512

513
_exit:
×
514
  if (code) {
×
515
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
516
  }
517
  return code;
×
518
}
519

520
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
60,105,446✔
521
                     int32_t encryptAlgorithm, char *encryptKey) {
522
  int32_t code = 0;
60,105,446✔
523
  int32_t lino;
524

525
  if (!pFD->pFD) {
60,105,446✔
526
    code = tsdbOpenFileImpl(pFD);
6,968,491✔
527
    TSDB_CHECK_CODE(code, lino, _exit);
6,969,234!
528
  }
529

530
  if (pFD->s3File && pFD->lcn > 1 /* && tsS3BlockSize < 0*/) {
60,106,189!
531
    code = tsdbReadFileS3(pFD, offset, pBuf, size, szHint);
×
532
    TSDB_CHECK_CODE(code, lino, _exit);
×
533
  } else {
534
    code = tsdbReadFileImp(pFD, offset, pBuf, size, encryptAlgorithm, encryptKey);
60,106,189✔
535
    TSDB_CHECK_CODE(code, lino, _exit);
60,093,507!
536
  }
537

538
_exit:
60,093,507✔
539
  if (code) {
60,093,507!
540
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
541
  }
542
  return code;
60,093,426✔
543
}
544

545
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
39,704,546✔
546
                             int32_t encryptAlgorithm, char *encryptKey) {
547
  int32_t code;
548
  int32_t lino;
549

550
  code = tBufferEnsureCapacity(buffer, buffer->size + size);
39,704,546✔
551
  TSDB_CHECK_CODE(code, lino, _exit);
39,708,208!
552

553
  code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint, encryptAlgorithm, encryptKey);
39,708,208✔
554
  TSDB_CHECK_CODE(code, lino, _exit);
39,712,595!
555

556
  buffer->size += size;
39,712,595✔
557

558
_exit:
39,712,595✔
559
  if (code) {
39,712,595!
560
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
561
  }
562
  return code;
39,712,504✔
563
}
564

565
int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
166,604✔
566
  int32_t code = 0;
166,604✔
567
  int32_t lino;
568

569
  code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
166,604✔
570
  TSDB_CHECK_CODE(code, lino, _exit);
166,653!
571

572
  if (taosFsyncFile(pFD->pFD) < 0) {
166,653!
573
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
×
574
  }
575

576
_exit:
166,632✔
577
  if (code) {
166,632!
578
    TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
×
579
  }
580
  return code;
166,633✔
581
}
582

583
// SDataFReader ====================================================
584
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
×
585
  int32_t       code = 0;
×
586
  int32_t       lino = 0;
×
587
  SDataFReader *pReader = NULL;
×
588
  int32_t       szPage = pTsdb->pVnode->config.tsdbPageSize;
×
589
  char          fname[TSDB_FILENAME_LEN];
590

591
  // alloc
592
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
×
593
  if (pReader == NULL) {
×
594
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
595
  }
596
  pReader->pTsdb = pTsdb;
×
597
  pReader->pSet = pSet;
×
598

599
  // head
600
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
×
601
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD, 0);
×
602
  TSDB_CHECK_CODE(code, lino, _exit);
×
603

604
  // data
605
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
×
606
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD, 0);
×
607
  TSDB_CHECK_CODE(code, lino, _exit);
×
608

609
  // sma
610
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
×
611
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD, 0);
×
612
  TSDB_CHECK_CODE(code, lino, _exit);
×
613

614
  // stt
615
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
×
616
    tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
×
617
    code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt], 0);
×
618
    TSDB_CHECK_CODE(code, lino, _exit);
×
619
  }
620

621
_exit:
×
622
  if (code) {
×
623
    *ppReader = NULL;
×
624
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
625

626
    if (pReader) {
×
627
      for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
×
628
      tsdbCloseFile(&pReader->pSmaFD);
×
629
      tsdbCloseFile(&pReader->pDataFD);
×
630
      tsdbCloseFile(&pReader->pHeadFD);
×
631
      taosMemoryFree(pReader);
×
632
    }
633
  } else {
634
    *ppReader = pReader;
×
635
  }
636
  return code;
×
637
}
638

639
void tsdbDataFReaderClose(SDataFReader **ppReader) {
×
640
  if (*ppReader == NULL) return;
×
641

642
  // head
643
  tsdbCloseFile(&(*ppReader)->pHeadFD);
×
644

645
  // data
646
  tsdbCloseFile(&(*ppReader)->pDataFD);
×
647

648
  // sma
649
  tsdbCloseFile(&(*ppReader)->pSmaFD);
×
650

651
  // stt
652
  for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
×
653
    if ((*ppReader)->aSttFD[iStt]) {
×
654
      tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
×
655
    }
656
  }
657

658
  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
×
659
    tFree((*ppReader)->aBuf[iBuf]);
×
660
  }
661
  taosMemoryFree(*ppReader);
×
662
  *ppReader = NULL;
×
663
}
664

665
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
×
666
  int32_t    code = 0;
×
667
  int32_t    lino;
668
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
×
669
  int64_t    offset = pHeadFile->offset;
×
670
  int64_t    size = pHeadFile->size - offset;
×
671

672
  taosArrayClear(aBlockIdx);
×
673
  if (size == 0) {
×
674
    return code;
×
675
  }
676

677
  // alloc
678
  code = tRealloc(&pReader->aBuf[0], size);
×
679
  TSDB_CHECK_CODE(code, lino, _exit);
×
680

681
  // read
682
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
683
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
684
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
685
  TSDB_CHECK_CODE(code, lino, _exit);
×
686

687
  // decode
688
  int64_t n = 0;
×
689
  while (n < size) {
×
690
    SBlockIdx blockIdx;
691
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
×
692

693
    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
×
694
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
695
    }
696
  }
697
  if (n != size) {
×
698
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
699
  }
700

701
_exit:
×
702
  if (code) {
×
703
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
704
  }
705
  return code;
×
706
}
707

708
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
×
709
  int32_t   code = 0;
×
710
  int32_t   lino;
711
  SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
×
712
  int64_t   offset = pSttFile->offset;
×
713
  int64_t   size = pSttFile->size - offset;
×
714

715
  taosArrayClear(aSttBlk);
×
716
  if (size == 0) {
×
717
    return 0;
×
718
  }
719

720
  // alloc
721
  code = tRealloc(&pReader->aBuf[0], size);
×
722
  TSDB_CHECK_CODE(code, lino, _exit);
×
723

724
  // read
725
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
726
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
727
  code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
728
  TSDB_CHECK_CODE(code, lino, _exit);
×
729

730
  // decode
731
  int64_t n = 0;
×
732
  while (n < size) {
×
733
    SSttBlk sttBlk;
734
    n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
×
735

736
    if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
×
737
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
738
    }
739
  }
740
  if (n != size) {
×
741
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
742
  }
743

744
_exit:
×
745
  if (code) {
×
746
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
747
  }
748
  return code;
×
749
}
750

751
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
×
752
  int32_t code = 0;
×
753
  int32_t lino;
754
  int64_t offset = pBlockIdx->offset;
×
755
  int64_t size = pBlockIdx->size;
×
756

757
  // alloc
758
  code = tRealloc(&pReader->aBuf[0], size);
×
759
  TSDB_CHECK_CODE(code, lino, _exit);
×
760

761
  // read
762
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
763
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
764
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
765
  TSDB_CHECK_CODE(code, lino, _exit);
×
766

767
  // decode
768
  int32_t n;
769
  code = tGetMapData(pReader->aBuf[0], mDataBlk, &n);
×
770
  if (code) goto _exit;
×
771
  if (n != size) {
×
772
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
773
  }
774

775
_exit:
×
776
  if (code) {
×
777
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
778
  }
779
  return code;
×
780
}
781

782
// SDelFReader ====================================================
783
struct SDelFReader {
784
  STsdb   *pTsdb;
785
  SDelFile fDel;
786
  STsdbFD *pReadH;
787
  uint8_t *aBuf[1];
788
};
789

790
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
×
791
  int32_t      code = 0;
×
792
  int32_t      lino = 0;
×
793
  char         fname[TSDB_FILENAME_LEN];
794
  SDelFReader *pDelFReader = NULL;
×
795

796
  // alloc
797
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
×
798
  if (pDelFReader == NULL) {
×
799
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
800
  }
801

802
  // open impl
803
  pDelFReader->pTsdb = pTsdb;
×
804
  pDelFReader->fDel = *pFile;
×
805

806
  tsdbDelFileName(pTsdb, pFile, fname);
×
807
  code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH, 0);
×
808
  TSDB_CHECK_CODE(code, lino, _exit);
×
809

810
_exit:
×
811
  if (code) {
×
812
    *ppReader = NULL;
×
813
    TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
×
814
    taosMemoryFree(pDelFReader);
×
815
  } else {
816
    *ppReader = pDelFReader;
×
817
  }
818
  return code;
×
819
}
820

821
void tsdbDelFReaderClose(SDelFReader **ppReader) {
×
822
  int32_t      code = 0;
×
823
  SDelFReader *pReader = *ppReader;
×
824

825
  if (pReader) {
×
826
    tsdbCloseFile(&pReader->pReadH);
×
827
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
×
828
      tFree(pReader->aBuf[iBuf]);
×
829
    }
830
    taosMemoryFree(pReader);
×
831
  }
832

833
  *ppReader = NULL;
×
834
}
×
835

836
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
837
  return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX);
×
838
}
839

840
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
×
841
  int32_t code = 0;
×
842
  int32_t lino;
843
  int64_t offset = pDelIdx->offset;
×
844
  int64_t size = pDelIdx->size;
×
845
  int64_t n;
846

847
  taosArrayClear(aDelData);
×
848

849
  // alloc
850
  code = tRealloc(&pReader->aBuf[0], size);
×
851
  TSDB_CHECK_CODE(code, lino, _exit);
×
852

853
  // read
854
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
855
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
856
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
857
  TSDB_CHECK_CODE(code, lino, _exit);
×
858

859
  // // decode
860
  n = 0;
×
861
  while (n < size) {
×
862
    SDelData delData;
863
    n += tGetDelData(pReader->aBuf[0] + n, &delData);
×
864

865
    if (delData.version > maxVer) {
×
866
      continue;
×
867
    }
868
    if (taosArrayPush(aDelData, &delData) == NULL) {
×
869
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
870
    }
871
  }
872

873
  if (n != size) {
×
874
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
875
  }
876

877
_exit:
×
878
  if (code) {
×
879
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
880
  }
881
  return code;
×
882
}
883

884
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
×
885
  int32_t code = 0;
×
886
  int32_t lino;
887
  int32_t n;
888
  int64_t offset = pReader->fDel.offset;
×
889
  int64_t size = pReader->fDel.size - offset;
×
890

891
  taosArrayClear(aDelIdx);
×
892

893
  // alloc
894
  code = tRealloc(&pReader->aBuf[0], size);
×
895
  TSDB_CHECK_CODE(code, lino, _exit);
×
896

897
  // read
898
  int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
×
899
  char   *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
×
900
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
×
901
  TSDB_CHECK_CODE(code, lino, _exit);
×
902

903
  // decode
904
  n = 0;
×
905
  while (n < size) {
×
906
    SDelIdx delIdx;
907

908
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
×
909

910
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
×
911
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
912
    }
913
  }
914

915
  if (n != size) {
×
916
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
917
  }
918

919
_exit:
×
920
  if (code) {
×
921
    TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
×
922
  }
923
  return code;
×
924
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc