• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.1
/source/libs/tdb/src/db/tdbPager.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "tdbInt.h"
18
#include "tglobal.h"
19

20
struct hashset_st {
21
  size_t  nbits;
22
  size_t  mask;
23
  size_t  capacity;
24
  size_t *items;
25
  size_t  nitems;
26
  double  load_factor;
27
};
28

29
static const unsigned int prime = 39;
30
static const unsigned int prime2 = 5009;
31

32
static hashset_t hashset_create(void) {
430,320✔
33
  hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st));
430,320!
34
  if (!set) {
430,510!
35
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
36
    return NULL;
×
37
  }
38

39
  set->nbits = 4;
430,510✔
40
  set->capacity = (size_t)(1 << set->nbits);
430,510✔
41
  set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
430,510!
42
  if (!set->items) {
430,426!
43
    tdbOsFree(set);
×
44
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
45
    return NULL;
×
46
  }
47
  set->mask = set->capacity - 1;
430,426✔
48
  set->nitems = 0;
430,426✔
49

50
  set->load_factor = 0.75;
430,426✔
51

52
  return set;
430,426✔
53
}
54

55
void hashset_destroy(hashset_t set) {
430,546✔
56
  if (set) {
430,546!
57
    tdbOsFree(set->items);
430,559!
58
    tdbOsFree(set);
430,582!
59
  }
60
}
430,573✔
61

62
static int hashset_add_member(hashset_t set, void *item) {
455,892✔
63
  size_t value = (size_t)item;
455,892✔
64
  size_t h;
65

66
  if (value == 0) {
455,892✔
67
    return -1;
1,940✔
68
  }
69

70
  for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
457,840✔
71
    if (set->items[h] == value) {
3,888!
72
      return 0;
×
73
    }
74
  }
75

76
  set->items[h] = value;
453,952✔
77
  ++set->nitems;
453,952✔
78
  return 1;
453,952✔
79
}
80

81
static int hashset_add(hashset_t set, void *item) {
448,206✔
82
  int ret = hashset_add_member(set, item);
448,206✔
83

84
  size_t old_capacity = set->capacity;
448,218✔
85
  if (set->nitems >= (double)old_capacity * set->load_factor) {
448,218✔
86
    size_t *old_items = set->items;
430✔
87
    ++set->nbits;
430✔
88
    set->capacity = (size_t)(1 << set->nbits);
430✔
89
    set->mask = set->capacity - 1;
430✔
90

91
    set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
430!
92
    if (!set->items) {
430!
93
      return -1;
×
94
    }
95

96
    set->nitems = 0;
430✔
97
    for (size_t i = 0; i < old_capacity; ++i) {
8,191✔
98
      int nt = hashset_add_member(set, (void *)old_items[i]);
7,761✔
99
    }
100
    tdbOsFree(old_items);
430!
101
  }
102

103
  return ret;
448,063✔
104
}
105

106
static int hashset_remove(hashset_t set, void *item) {
834,787✔
107
  size_t value = (size_t)item;
834,787✔
108

109
  for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
842,890✔
110
    if (set->items[h] == value) {
455,525✔
111
      set->items[h] = 0;
447,422✔
112
      --set->nitems;
447,422✔
113
      return 1;
447,422✔
114
    }
115
  }
116

117
  return 0;
387,365✔
118
}
119

120
static int hashset_contains(hashset_t set, void *item) {
536,993✔
121
  size_t value = (size_t)item;
536,993✔
122

123
  for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
541,468✔
124
    if (set->items[h] == value) {
93,505✔
125
      return 1;
89,030✔
126
    }
127
  }
128

129
  return 0;
447,963✔
130
}
131

132
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
133

134
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
135
                            u8 loadPage);
136
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
137
static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage);
138

139
static FORCE_INLINE int32_t pageCmpFn(const SRBTreeNode *lhs, const SRBTreeNode *rhs) {
11,566,373✔
140
  SPage *pPageL = (SPage *)(((uint8_t *)lhs) - offsetof(SPage, node));
11,566,373✔
141
  SPage *pPageR = (SPage *)(((uint8_t *)rhs) - offsetof(SPage, node));
11,566,373✔
142

143
  SPgno pgnoL = TDB_PAGE_PGNO(pPageL);
11,566,373✔
144
  SPgno pgnoR = TDB_PAGE_PGNO(pPageR);
11,566,373✔
145

146
  if (pgnoL < pgnoR) {
11,566,373✔
147
    return -1;
2,556,033✔
148
  } else if (pgnoL > pgnoR) {
9,010,340!
149
    return 1;
9,012,826✔
150
  } else {
151
    return 0;
×
152
  }
153
}
154

155
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
46,922✔
156
  uint8_t *pPtr;
157
  SPager  *pPager;
158
  int      fsize;
159
  int      zsize;
160
  int      ret;
161

162
  *ppPager = NULL;
46,922✔
163

164
  fsize = strlen(fileName);
46,922✔
165
  zsize = sizeof(*pPager)  /* SPager */
46,922✔
166
          + fsize + 1      /* dbFileName */
167
          + fsize + 8 + 1; /* jFileName */
46,922✔
168
  pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
46,922!
169
  if (pPtr == NULL) {
47,034!
170
    return terrno;
×
171
  }
172

173
  pPager = (SPager *)pPtr;
47,034✔
174
  pPtr += sizeof(*pPager);
47,034✔
175
  // pPager->dbFileName
176
  pPager->dbFileName = (char *)pPtr;
47,034✔
177
  memcpy(pPager->dbFileName, fileName, fsize);
47,034✔
178
  pPager->dbFileName[fsize] = '\0';
47,034✔
179
  pPtr += fsize + 1;
47,034✔
180
  // pPager->jFileName
181
  pPager->jFileName = (char *)pPtr;
47,034✔
182
  memcpy(pPager->jFileName, fileName, fsize);
47,034✔
183
  memcpy(pPager->jFileName + fsize, "-journal", 8);
47,034✔
184
  pPager->jFileName[fsize + 8] = '\0';
47,034✔
185
  // pPager->pCache
186
  pPager->pCache = pCache;
47,034✔
187

188
  pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
47,034✔
189
  if (TDB_FD_INVALID(pPager->fd)) {
47,004!
190
    // if (pPager->fd < 0) {
191
    return TAOS_SYSTEM_ERROR(ERRNO);
×
192
  }
193

194
  ret = tdbGnrtFileID(pPager->fd, pPager->fid, false);
47,004✔
195
  if (ret < 0) {
46,997!
196
    return TAOS_SYSTEM_ERROR(ERRNO);
×
197
  }
198

199
  // pPager->jfd = -1;
200
  pPager->pageSize = tdbPCacheGetPageSize(pCache);
46,997✔
201
  // pPager->dbOrigSize
202
  ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
47,028✔
203
  pPager->dbFileSize = pPager->dbOrigSize;
47,026✔
204

205
  tdbTrace("pager/open reset dirty tree: %p", &pPager->rbt);
47,026✔
206
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
47,026✔
207

208
  *ppPager = pPager;
47,000✔
209
  return 0;
47,000✔
210
}
211

212
void tdbPagerClose(SPager *pPager) {
47,042✔
213
  if (pPager) {
47,042!
214
    int32_t code = tdbOsClose(pPager->fd);
47,043✔
215
    if (code) {
47,044!
216
      tdbWarn("failed to close file since %s", tstrerror(code));
×
217
    }
218
    tdbOsFree(pPager);
47,044!
219
  }
220
  return;
47,044✔
221
}
222

223
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
3,819,964✔
224
  int     ret;
225
  SPage **ppPage;
226

227
  if (pPage->isDirty) return 0;
3,819,964✔
228

229
  // ref page one more time so the page will not be release
230
  int32_t nRef = tdbRefPage(pPage);
1,234,656✔
231
  tdbTrace("pager/mdirty page %p/%d/%d, ref:%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef);
1,235,802✔
232

233
  // Set page as dirty
234
  pPage->isDirty = 1;
1,235,802✔
235

236
  tdbTrace("tdb/pager-write: put page: %p %d to dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
1,235,802✔
237
  SRBTreeNode *tnode = tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
1,235,802✔
238

239
  // Write page to journal if neccessary
240
  if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize &&
1,235,257✔
241
      (pPager->pActiveTxn->jPageSet == NULL ||
1,074,007!
242
       !hashset_contains(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) {
537,083✔
243
    ret = tdbPagerWritePageToJournal(pPager, pPage);
447,917✔
244
    if (ret < 0) {
448,249!
245
      tdbError("failed to write page to journal since %s", tstrerror(ret));
×
246
      return ret;
×
247
    }
248

249
    if (pPager->pActiveTxn->jPageSet) {
448,249!
250
      int32_t nt = hashset_add(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
448,349✔
251
    }
252
  }
253

254
  return 0;
1,235,287✔
255
}
256

257
int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
430,486✔
258
  /*
259
  if (pPager->inTran) {
260
    return 0;
261
  }
262
  */
263
  // Open the journal
264
  char jTxnFileName[TDB_FILENAME_LEN];
265
  (void)tsnprintf(jTxnFileName, TDB_FILENAME_LEN, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);
430,486✔
266
  pTxn->jfd = tdbOsOpen(jTxnFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
430,514✔
267
  if (TDB_FD_INVALID(pTxn->jfd)) {
430,438!
268
    tdbError("failed to open file due to %s. jFileName:%s", strerror(ERRNO), pPager->jFileName);
×
269
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
270
  }
271

272
  pTxn->jPageSet = hashset_create();
430,438✔
273
  if (pTxn->jPageSet == NULL) {
430,375!
274
    return terrno;
×
275
  }
276

277
  pPager->pActiveTxn = pTxn;
430,375✔
278

279
  tdbDebug("pager/begin: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
430,375✔
280

281
  // TODO: write the size of the file
282
  /*
283
  pPager->inTran = 1;
284
  */
285
  return 0;
430,478✔
286
}
287
/*
288
int tdbPagerCancelDirty(SPager *pPager, SPage *pPage, TXN *pTxn) {
289
  SRBTreeNode *pNode = tRBTreeGet(&pPager->rbt, (SRBTreeNode *)pPage);
290
  if (pNode) {
291
    pPage->isDirty = 0;
292

293
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
294
    if (pTxn->jPageSet) {
295
      hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
296
    }
297

298
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
299
  }
300

301
  return 0;
302
}
303
*/
304
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
443,168✔
305
  SPage *pPage;
306
  int    ret;
307

308
  // sync the journal file
309
  ret = tdbOsFSync(pTxn->jfd);
443,168✔
310
  if (ret < 0) {
443,370!
311
    tdbError("failed to fsync: %s. jFileName:%s, %" PRId64, strerror(ERRNO), pPager->jFileName, pTxn->txnId);
×
312
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
313
  }
314

315
  // loop to write the dirty pages to file
316
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
443,391✔
317
  SRBTreeNode *pNode = NULL;
443,391✔
318
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
1,224,519✔
319
    pPage = (SPage *)pNode;
781,260✔
320

321
    if (pPage->nOverflow != 0) {
781,260!
322
      tdbError("tdb/pager-commit: %p, pPage: %p, ovfl: %d, commit page failed.", pPager, pPage, pPage->nOverflow);
×
323
      return TSDB_CODE_INVALID_DATA_FMT;
×
324
    }
325

326
    ret = tdbPagerPWritePageToDB(pPager, pPage);
781,260✔
327
    if (ret < 0) {
781,128!
328
      tdbError("failed to write page to db since %s", tstrerror(terrno));
×
329
      return ret;
×
330
    }
331
  }
332

333
  tdbDebug("pager/commit: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
443,107✔
334

335
  pPager->dbOrigSize = pPager->dbFileSize;
443,284✔
336

337
  // release the page
338
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
443,284✔
339
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
1,224,547✔
340
    pPage = (SPage *)pNode;
781,204✔
341

342
    pPage->isDirty = 0;
781,204✔
343

344
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
781,204✔
345
    if (pTxn->jPageSet) {
780,770!
346
      int32_t nt = hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
780,912✔
347
    }
348

349
    tdbTrace("tdb/pager-commit: remove page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
780,764✔
350

351
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
780,764✔
352
  }
353

354
  tdbTrace("tdb/pager-commit reset dirty tree: %p", &pPager->rbt);
443,150✔
355
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
443,150✔
356

357
  // sync the db file
358
  if (tdbOsFSync(pPager->fd) < 0) {
443,226!
359
    tdbError("failed to fsync fd due to %s. file:%s", strerror(ERRNO), pPager->dbFileName);
×
360
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
361
  }
362

363
  return 0;
443,374✔
364
}
365

366
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
398,244✔
367
  char jTxnFileName[TDB_FILENAME_LEN];
368
  (void)tsnprintf(jTxnFileName, TDB_FILENAME_LEN, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);
398,244✔
369

370
  // remove the journal file
371
  if (tdbOsClose(pTxn->jfd) < 0) {
398,221!
372
    tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(ERRNO), pPager->jFileName, pTxn->txnId);
×
373
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
374
  }
375

376
  if (tdbOsRemove(jTxnFileName) < 0 && ERRNO != ENOENT) {
398,263!
377
    tdbError("failed to remove file due to %s. file:%s", strerror(ERRNO), jTxnFileName);
×
378
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
379
  }
380

381
  // pPager->inTran = 0;
382

383
  tdbDebug("pager/post-commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
398,236✔
384

385
  return 0;
398,250✔
386
}
387

388
int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
×
389
  SPage *pPage;
390
  SPgno  maxPgno = pPager->dbOrigSize;
×
391
  int    ret;
392

393
  // sync the journal file
394
  ret = tdbOsFSync(pTxn->jfd);
×
395
  if (ret < 0) {
×
396
    tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(ERRNO), pPager->jFileName, pTxn->txnId);
×
397
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
398
  }
399

400
  // loop to write the dirty pages to file
401
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
×
402
  SRBTreeNode *pNode = NULL;
×
403
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
×
404
    pPage = (SPage *)pNode;
×
405
    if (pPage->isLocal) continue;
×
406

407
    SPgno pgno = TDB_PAGE_PGNO(pPage);
×
408
    if (pgno > maxPgno) {
×
409
      maxPgno = pgno;
×
410
    }
411
    ret = tdbPagerPWritePageToDB(pPager, pPage);
×
412
    if (ret < 0) {
×
413
      tdbError("failed to write page to db since %s", tstrerror(terrno));
×
414
      return ret;
×
415
    }
416
  }
417

418
  tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
×
419
  pPager->dbOrigSize = maxPgno;
×
420
  //  pPager->dbOrigSize = pPager->dbFileSize;
421

422
  // release the page
423
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
×
424
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
×
425
    pPage = (SPage *)pNode;
×
426
    if (pPage->isLocal) continue;
×
427
    pPage->isDirty = 0;
×
428

429
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
×
430
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
×
431
  }
432

433
  return 0;
×
434
}
435

436
static char *tdbEncryptPage(SPager *pPager, char *pPageData, int32_t pageSize, const char *function, int64_t offset) {
1,224,860✔
437
  int32_t encryptAlgorithm = pPager->pEnv->encryptAlgorithm;
1,224,860✔
438
  char   *encryptKey = pPager->pEnv->encryptKey;
1,224,860✔
439

440
  char *buf = pPageData;
1,224,860✔
441

442
  if (encryptAlgorithm == DND_CA_SM4) {
1,224,860✔
443
    // tdbInfo("CBC_Encrypt key:%d %s %s", encryptAlgorithm, encryptKey, __FUNCTION__);
444

445
    // tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d before Encrypt", offset, pPage->pData[0]);
446

447
    buf = taosMemoryMalloc(pageSize);
28!
448
    if (buf == NULL) {
28!
449
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
450
      return NULL;
×
451
    }
452

453
    unsigned char packetData[128];
454

455
    int32_t count = 0;
28✔
456
    while (count < pageSize) {
924✔
457
      SCryptOpts opts = {0};
896✔
458
      opts.len = 128;
896✔
459
      opts.source = pPageData + count;
896✔
460
      opts.result = packetData;
896✔
461
      opts.unitLen = 128;
896✔
462
      tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
896✔
463

464
      int32_t newLen = CBC_Encrypt(&opts);
896✔
465

466
      memcpy(buf + count, packetData, newLen);
896✔
467
      count += newLen;
896✔
468
    }
469
    // tdbInfo("CBC tdb offset:%" PRId64 ", Encrypt count:%d %s", offset, count, function);
470

471
    // tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d after Encrypt", offset, (uint8_t)buf[0]);
472
  }
473

474
  return buf;
1,224,860✔
475
}
476

477
void tdbFreeEncryptBuf(SPager *pPager, char *buf) {
1,224,758✔
478
  int32_t encryptAlgorithm = pPager->pEnv->encryptAlgorithm;
1,224,758✔
479
  if (encryptAlgorithm == DND_CA_SM4) taosMemoryFreeClear(buf);
1,224,758!
480
}
1,224,758✔
481
// recovery dirty pages
482
int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
32,308✔
483
  SPage *pPage;
484
  int    pgIdx;
485
  SPgno  journalSize = 0;
32,308✔
486
  int    ret;
487

488
  if (pTxn->jfd == 0) {
32,308!
489
    // txn is commited
490
    return 0;
×
491
  }
492

493
  // sync the journal file
494
  ret = tdbOsFSync(pTxn->jfd);
32,308✔
495
  if (ret < 0) {
32,321!
496
    tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(ERRNO), pPager->jFileName, pTxn->txnId);
×
497
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
498
  }
499

500
  tdb_fd_t jfd = pTxn->jfd;
32,321✔
501

502
  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
32,321✔
503
  if (ret < 0) {
32,317!
504
    return ret;
×
505
  }
506

507
  if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
32,317!
508
    tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(ERRNO), pPager->dbFileName);
×
509
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
510
  }
511

512
  u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
32,319!
513
  if (pageBuf == NULL) {
32,325!
514
    return terrno;
×
515
  }
516

517
  tdbDebug("pager/abort: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
32,325✔
518

519
  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
75,116✔
520
    // read pgno & the page from journal
521
    SPgno pgno;
522

523
    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
42,790✔
524
    if (ret < 0) {
42,790!
525
      tdbOsFree(pageBuf);
×
526
      return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
527
    }
528

529
    tdbTrace("pager/abort: restore pgno:%d,", pgno);
42,790✔
530

531
    tdbPCacheInvalidatePage(pPager->pCache, pPager, pgno);
42,790✔
532

533
    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
42,790✔
534
    if (ret < 0) {
42,790!
535
      tdbOsFree(pageBuf);
×
536
      return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
537
    }
538

539
    i64 offset = pPager->pageSize * (pgno - 1);
42,790✔
540
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
42,790!
541
      tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(ERRNO), pPager->dbFileName, offset);
×
542
      tdbOsFree(pageBuf);
×
543
      return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
544
    }
545

546
    char *buf = tdbEncryptPage(pPager, pageBuf, pPager->pageSize, __FUNCTION__, offset);
42,790✔
547
    if (buf == NULL) {
42,790!
548
      return terrno;
×
549
    }
550

551
    ret = tdbOsWrite(pPager->fd, buf, pPager->pageSize);
42,790✔
552
    if (ret < 0) {
42,790!
553
      tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(ERRNO), pPager->dbFileName,
×
554
               pPager->pageSize);
555
      tdbFreeEncryptBuf(pPager, buf);
×
556
      tdbOsFree(pageBuf);
×
557
      return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
558
    }
559

560
    tdbFreeEncryptBuf(pPager, buf);
42,790✔
561
  }
562

563
  if (tdbOsFSync(pPager->fd) < 0) {
32,326!
564
    tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(ERRNO), pPager->dbFileName);
×
565
    tdbOsFree(pageBuf);
×
566
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
567
  }
568

569
  tdbOsFree(pageBuf);
32,327!
570

571
  // 3, release the dirty pages
572
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
32,327✔
573
  SRBTreeNode *pNode = NULL;
32,327✔
574
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
86,233✔
575
    pPage = (SPage *)pNode;
53,906✔
576
    SPgno pgno = TDB_PAGE_PGNO(pPage);
53,906✔
577

578
    tdbTrace("pager/abort: drop dirty pgno:%d,", pgno);
53,906✔
579

580
    pPage->isDirty = 0;
53,906✔
581

582
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
53,906✔
583
    int32_t nt = hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
53,906✔
584
    tdbPCacheMarkFree(pPager->pCache, pPage);
53,906✔
585
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
53,906✔
586
  }
587

588
  tdbTrace("pager/abort: reset dirty tree: %p", &pPager->rbt);
32,327✔
589
  tRBTreeCreate(&pPager->rbt, pageCmpFn);
32,327✔
590

591
  // 4, remove the journal file
592
  if (tdbOsClose(pTxn->jfd) < 0) {
32,327!
593
    tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(ERRNO), pPager->jFileName, pTxn->txnId);
×
594
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
595
  }
596

597
  char jTxnFileName[TDB_FILENAME_LEN];
598
  (void)tsnprintf(jTxnFileName, TDB_FILENAME_LEN, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);
32,327✔
599

600
  if (tdbOsRemove(jTxnFileName) < 0 && ERRNO != ENOENT) {
32,325!
601
    tdbError("failed to remove file due to %s. file:%s", strerror(ERRNO), jTxnFileName);
×
602
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
603
  }
604

605
  // pPager->inTran = 0;
606

607
  return 0;
32,325✔
608
}
609

610
int tdbPagerFlushPage(SPager *pPager, TXN *pTxn, bool *flushed) {
400,435✔
611
  SPage *pPage;
612
  i32    nRef;
613
  SPgno  maxPgno = pPager->dbOrigSize;
400,435✔
614
  int    ret;
615

616
  // loop to write the dirty pages to file
617
  SRBTreeIter  iter = tRBTreeIterCreate(&pPager->rbt, 1);
400,435✔
618
  SRBTreeNode *pNode = NULL;
400,435✔
619
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
1,544,660!
620
    pPage = (SPage *)pNode;
1,544,660✔
621
    nRef = tdbGetPageRef(pPage);
1,544,660✔
622
    if (nRef > 1) {
1,544,660✔
623
      continue;
1,144,225✔
624
    }
625

626
    *flushed = true;
400,435✔
627

628
    SPgno pgno = TDB_PAGE_PGNO(pPage);
400,435✔
629
    if (pgno > maxPgno) {
400,435✔
630
      maxPgno = pgno;
311,282✔
631
    }
632
    ret = tdbPagerPWritePageToDB(pPager, pPage);
400,435✔
633
    if (ret < 0) {
400,435!
634
      tdbError("failed to write page to db since %s", tstrerror(terrno));
×
635
      return ret;
×
636
    }
637

638
    tdbTrace("tdb/flush:%p, pgno:%d, %d/%d/%d", pPager, pgno, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
400,435!
639
    pPager->dbOrigSize = maxPgno;
400,435✔
640

641
    pPage->isDirty = 0;
400,435✔
642

643
    tdbTrace("pager/flush drop page: %p, pgno:%d, from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
400,435!
644
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
400,435✔
645
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
400,435✔
646

647
    break;
400,435✔
648
  }
649

650
  tdbDebug("pager/flush: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
400,435!
651

652
  /*
653
  tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
654
  pPager->dbOrigSize = maxPgno;
655

656
  // release the page
657
  iter = tRBTreeIterCreate(&pPager->rbt, 1);
658
  while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
659
    pPage = (SPage *)pNode;
660
    nRef = tdbGetPageRef(pPage);
661
    if (nRef > 1) {
662
      continue;
663
    }
664

665
    pPage->isDirty = 0;
666

667
    tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
668
    tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
669
    tdbPCacheRelease(pPager->pCache, pPage, pTxn);
670
  }
671
  */
672
  return 0;
400,435✔
673
}
674

675
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn);
676

677
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
118,079,285✔
678
                      TXN *pTxn) {
679
  SPage *pPage;
680
  SPgid  pgid;
681
  int    ret;
682
  SPgno  pgno;
683
  u8     loadPage;
684
  bool   flushed = true;
118,079,285✔
685

686
  pgno = *ppgno;
118,079,285✔
687
  loadPage = 1;
118,079,285✔
688

689
  // alloc new page
690
  if (pgno == 0) {
118,079,285✔
691
    loadPage = 0;
721,008✔
692
    ret = tdbPagerAllocPage(pPager, &pgno, pTxn);
721,008✔
693
    if (ret < 0) {
721,105!
UNCOV
694
      tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
×
695
      return ret;
×
696
    }
697
  }
698

699
  if (pgno == 0) {
118,177,741!
700
    tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
×
701
    return TSDB_CODE_INVALID_DATA_FMT;
×
702
  }
703

704
  // fetch a page container
705
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
118,177,741✔
706
  pgid.pgno = pgno;
118,177,741✔
707
  while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn, !flushed, NULL)) == NULL) {
118,578,176✔
708
    flushed = false;
400,435✔
709
    int32_t code = tdbPagerFlushPage(pPager, pTxn, &flushed);
400,435✔
710
    if (code) {
400,435!
711
      tdbError("tdb/pager: %p, pPage: %p, flush page failed.", pPager, pPage);
×
712
      return code;
×
713
    }
714
  }
715

716
  tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
118,186,370✔
717
  // init page if need
718
  if (!TDB_PAGE_INITIALIZED(pPage)) {
118,184,507✔
719
    ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
1,718,146✔
720
    if (ret < 0) {
1,717,872!
721
      tdbError("tdb/pager: %p, pPage: %p, init page failed.", pPager, pPage);
×
722
      return ret;
×
723
    }
724
  }
725

726
  // printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
727
  //        TDB_PAGE_PGNO(pPage), pPage);
728

729
  if (!TDB_PAGE_INITIALIZED(pPage)) {
118,184,233!
730
    tdbError("tdb/pager: %p, pPage: %p, fetch page uninited.", pPager, pPage);
×
731
    return TSDB_CODE_INVALID_DATA_FMT;
×
732
  }
733
  if (pPage->pPager != pPager) {
118,184,233!
734
    tdbError("tdb/pager: %p/%p, fetch page failed.", pPager, pPage->pPager);
×
735
    return TSDB_CODE_INVALID_DATA_FMT;
×
736
  }
737

738
  *ppgno = pgno;
118,184,233✔
739
  *ppPage = pPage;
118,184,233✔
740
  return 0;
118,184,233✔
741
}
742

743
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
115,438,617✔
744
  tdbPCacheRelease(pPager->pCache, pPage, pTxn);
115,438,617✔
745
  // printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
746
  //        TDB_PAGE_PGNO(pPage), pPage);
747
}
115,716,728✔
748

749
// tdbPagerFetchFreePage don't want the page data to be modified, so use a nop init function.
750
static int initFreePage(SPage *pPage, void *arg, int init) {
×
751
  return 0;
×
752
}
753

754
// tdbPagerFetchFreePage is only expected to be called in tdbBtreePushFreePage/tdbBtreePopFreePage.
755
// in all other cases, please call tdbPagerFetchPage instead.
756
//
757
// for a page, if it is still in the cache, then the memory copy has the latest data, but for free
758
// pages, tdbPagerFetchPage always load the page data from the file, this is not the desired behavior
759
// in tdbBtreePushFreePage/tdbBtreePopFreePage, so we need tdbPagerFetchFreePage.
760
int tdbPagerFetchFreePage(SPager *pPager, SPgno pgno, SPage **ppPage, TXN *pTxn) {
×
761
  SPage *pPage;
762
  SPgid  pgid;
763
  int    ret;
764
  bool   flushed = true;
×
765
  bool   loaded = false;
×
766

767
  memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
×
768
  pgid.pgno = pgno;
×
769

770
  while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn, !flushed, &loaded)) == NULL) {
×
771
    flushed = false;
×
772
    int32_t code = tdbPagerFlushPage(pPager, pTxn, &flushed);
×
773
    if (code) {
×
774
      tdbError("tdb/pager: %p, pPage: %p, flush page failed.", pPager, pPage);
×
775
      return code;
×
776
    }
777
  }
778

779
  if (!loaded) {
×
780
    ret = tdbPagerInitPage(pPager, pPage, initFreePage, NULL, 1);
×
781
    if (ret < 0) {
×
782
      tdbError("tdb/pager: %p, pPage: %p, init page failed.", pPager, pPage);
×
783
      return ret;
×
784
    }
785
  }
786

787
  pPage->pPager = NULL;
×
788
  *ppPage = pPage;
×
789
  return 0;
×
790
}
791

792
int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) {
35,038✔
793
  int tdbTbPushFreePage(TTB *pTb, SPage *pPage, TXN *pTxn);
794

795
  tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", TDB_PAGE_PGNO(pPage));
35,038✔
796
  int code = tdbTbPushFreePage(pPager->pEnv->pFreeDb, pPage, pTxn);
35,038✔
797
  if (code < 0) {
35,044✔
798
    tdbError("tdb/insert-free-page: tb push failed with ret: %d.", code);
4!
799
    return code;
×
800
  }
801

802
  pPage->pPager = NULL;
35,040✔
803
  return code;
35,040✔
804
}
805

806
static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno, TXN *pTxn) {
720,987✔
807
  int tdbTbPopFreePage(TTB *pTb, SPgno* pgno, TXN *pTxn);
808

809
  int  code = 0;
720,987✔
810
  if (!pPager->pEnv->pFreeDb) {
720,987✔
811
    return code;
73,251✔
812
  }
813

814
  code = tdbTbPopFreePage(pPager->pEnv->pFreeDb, pPgno, pTxn);
647,736✔
815
  if (code) {
647,839!
816
    tdbError("tdb/remove-free-page: pop failed with ret: %d.", code);
×
817
  }
818

819
  return 0;
647,820✔
820
}
821

822
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
720,966✔
823
  // Allocate a page from the free list
824
  return tdbPagerRemoveFreePage(pPager, ppgno, pTxn);
720,966✔
825
}
826

827
static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
698,377✔
828
  *ppgno = ++pPager->dbFileSize;
698,377✔
829
  // tdbError("tdb/alloc-new-page: %d.", *ppgno);
830
  return 0;
698,377✔
831
}
832

833
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
720,969✔
834
  int ret;
835

836
  *ppgno = 0;
720,969✔
837

838
  // Try to allocate from the free list of the pager
839
  ret = tdbPagerAllocFreePage(pPager, ppgno, pTxn);
720,969✔
840
  if (ret < 0) {
721,102!
841
    return ret;
×
842
  }
843

844
  if (*ppgno != 0) return 0;
721,102✔
845

846
  // Allocate the page by extending the pager
847
  ret = tdbPagerAllocNewPage(pPager, ppgno);
698,371✔
848
  if (ret < 0) {
698,370!
849
    return -1;
×
850
  }
851

852
  if (*ppgno == 0) {
698,370!
853
    tdbError("tdb/pager:%p, alloc new page failed.", pPager);
×
854
    return TSDB_CODE_FAILED;
×
855
  }
856
  return 0;
698,370✔
857
}
858

859
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
1,717,997✔
860
                            u8 loadPage) {
861
  int   ret;
862
  int   lcode;
863
  int   nLoops;
864
  i64   nRead = 0;
1,717,997✔
865
  SPgno pgno = 0;
1,717,997✔
866
  int   init = 0;
1,717,997✔
867

868
  lcode = TDB_TRY_LOCK_PAGE(pPage);
1,717,997✔
869
  if (lcode == P_LOCK_SUCC) {
1,718,175✔
870
    if (TDB_PAGE_INITIALIZED(pPage)) {
1,718,113!
871
      if (TDB_UNLOCK_PAGE(pPage) != 0) {
×
872
        tdbError("tdb/pager:%p, pgno:%d, unlock page failed.", pPager, pgno);
×
873
      }
874
      return 0;
×
875
    }
876

877
    pgno = TDB_PAGE_PGNO(pPage);
1,718,113✔
878

879
    tdbTrace("tdb/pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
1,718,113✔
880
    if (loadPage && pgno <= pPager->dbOrigSize) {
2,715,007!
881
      init = 1;
997,052✔
882

883
      nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
997,052✔
884
      tdbTrace("tdb/pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead);
996,978✔
885
      if (nRead < pPage->pageSize) {
996,978!
886
        tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize);
×
887
        if (TDB_UNLOCK_PAGE(pPage) < 0) {
×
888
          tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " unlock page failed.", pPager, pgno,
×
889
                   nRead, pPage->pageSize);
890
        }
891
        return TAOS_SYSTEM_ERROR(ERRNO);
×
892
      }
893

894
      int32_t encryptAlgorithm = pPager->pEnv->encryptAlgorithm;
996,978✔
895
      char   *encryptKey = pPager->pEnv->encryptKey;
996,978✔
896

897
      if (encryptAlgorithm == DND_CA_SM4) {
996,978!
898
        // tdbInfo("CBC_Decrypt key:%d %s %s", encryptAlgorithm, encryptKey, __FUNCTION__);
899

900
        // uint8_t flags = pPage->pData[0];
901
        // tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d before Decrypt", ((i64)pPage->pageSize) * (pgno - 1), flags);
902

903
        unsigned char packetData[128];
904

905
        int32_t count = 0;
×
906
        while (count < pPage->pageSize) {
×
907
          SCryptOpts opts = {0};
×
908
          opts.len = 128;
×
909
          opts.source = pPage->pData + count;
×
910
          opts.result = packetData;
×
911
          opts.unitLen = 128;
×
912
          tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
×
913

914
          int newLen = CBC_Decrypt(&opts);
×
915

916
          memcpy(pPage->pData + count, packetData, newLen);
×
917
          count += newLen;
×
918
        }
919
        // tdbInfo("CBC tdb offset:%" PRId64 ", Decrypt count:%d %s", ((i64)pPage->pageSize) * (pgno - 1), count,
920
        // __FUNCTION__);
921

922
        // tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d after Decrypt %s", ((i64)pPage->pageSize) * (pgno - 1),
923
        // pPage->pData[0], __FUNCTION__);
924
      }
925
    } else {
926
      init = 0;
720,977✔
927
    }
928

929
    // tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d initPage %s", ((i64)pPage->pageSize) * (pgno - 1), pPage->pData[0],
930
    // __FUNCTION__);
931

932
    ret = (*initPage)(pPage, arg, init);
1,717,955✔
933
    if (ret < 0) {
1,717,708!
934
      tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " init page failed.", pPager, pgno, nRead,
×
935
               pPage->pageSize);
936
      if (TDB_UNLOCK_PAGE(pPage) != 0) {
×
937
        tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " unlock page failed.", pPager, pgno, nRead,
×
938
                 pPage->pageSize);
939
      }
940
      return ret;
×
941
    }
942

943
    tmemory_barrier();
1,717,708✔
944

945
    pPage->pPager = pPager;
1,717,708✔
946

947
    if (TDB_UNLOCK_PAGE(pPage) != 0) {
1,717,708!
948
      tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " unlock page failed.", pPager, pgno, nRead,
×
949
               pPage->pageSize);
950
    }
951
  } else if (lcode == P_LOCK_BUSY) {
62✔
952
    nLoops = 0;
7✔
953
    for (;;) {
954
      if (TDB_PAGE_INITIALIZED(pPage)) break;
28,262✔
955
      nLoops++;
28,255✔
956
      if (nLoops > 1000) {
28,255✔
957
        (void)sched_yield();
27✔
958
        nLoops = 0;
27✔
959
      }
960
    }
961
  } else {
962
    tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " lock page failed.", pPager, pgno, nRead,
55!
963
             pPage->pageSize);
964
    return TSDB_CODE_FAILED;
×
965
  }
966

967
  return 0;
1,717,862✔
968
}
969

970
// ---------------------------- Journal manipulation
971
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
447,957✔
972
  int   ret;
973
  SPgno pgno;
974

975
  pgno = TDB_PAGE_PGNO(pPage);
447,957✔
976

977
  ret = tdbOsWrite(pPager->pActiveTxn->jfd, &pgno, sizeof(pgno));
447,957✔
978
  if (ret < 0) {
448,271!
979
    tdbError("failed to write pgno due to %s. file:%s, pgno:%u, txnId:%" PRId64, strerror(ERRNO), pPager->jFileName,
×
980
             pgno, pPager->pActiveTxn->txnId);
981
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
982
  }
983

984
  ret = tdbOsWrite(pPager->pActiveTxn->jfd, pPage->pData, pPage->pageSize);
448,271✔
985
  if (ret < 0) {
448,384✔
986
    tdbError("failed to write page data due to %s. file:%s, pageSize:%d, txnId:%" PRId64, strerror(ERRNO),
81!
987
             pPager->jFileName, pPage->pageSize, pPager->pActiveTxn->txnId);
988
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
81✔
989
  }
990

991
  return 0;
448,303✔
992
}
993
/*
994
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
995
  i64 offset;
996
  int ret;
997

998
  offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
999
  if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
1000
    tdbError("failed to lseek due to %s. file:%s, offset:%" PRId64, strerror(ERRNO), pPager->dbFileName, offset);
1001
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
1002
    return -1;
1003
  }
1004

1005
  ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
1006
  if (ret < 0) {
1007
    tdbError("failed to write page data due to %s. file:%s, pageSize:%d", strerror(ERRNO), pPager->dbFileName,
1008
             pPage->pageSize);
1009
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
1010
    return -1;
1011
  }
1012

1013
  return 0;
1014
}
1015
*/
1016
static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage) {
1,181,650✔
1017
  i64 offset;
1018
  int ret;
1019

1020
  offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
1,181,650✔
1021

1022
  char *buf = tdbEncryptPage(pPager, pPage->pData, pPage->pageSize, __FUNCTION__, offset);
1,181,650✔
1023

1024
  ret = tdbOsPWrite(pPager->fd, buf, pPage->pageSize, offset);
1,181,604✔
1025
  if (ret < 0) {
1,181,592!
1026
    tdbFreeEncryptBuf(pPager, buf);
×
1027
    tdbError("failed to pwrite page data due to %s. file:%s, pageSize:%d", strerror(ERRNO), pPager->dbFileName,
×
1028
             pPage->pageSize);
1029
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1030
  }
1031

1032
  tdbFreeEncryptBuf(pPager, buf);
1,181,592✔
1033

1034
  return 0;
1,181,558✔
1035
}
1036

1037
static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
100✔
1038
  int   ret = 0;
100✔
1039
  SPgno journalSize = 0;
100✔
1040
  u8   *pageBuf = NULL;
100✔
1041

1042
  tdb_fd_t jfd = tdbOsOpen(jFileName, TDB_O_RDWR, 0755);
100✔
1043
  if (jfd == NULL) {
100!
1044
    return 0;
×
1045
  }
1046

1047
  ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
100✔
1048
  if (ret < 0) {
100!
1049
    return TAOS_SYSTEM_ERROR(ERRNO);
×
1050
  }
1051

1052
  if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
100!
1053
    tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(ERRNO), pPager->dbFileName);
×
1054
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1055
  }
1056

1057
  pageBuf = tdbOsCalloc(1, pPager->pageSize);
100!
1058
  if (pageBuf == NULL) {
100!
1059
    return terrno;
×
1060
  }
1061

1062
  tdbDebug("pager/restore: %p, %d/%d, txnId:%s", pPager, pPager->dbOrigSize, pPager->dbFileSize, jFileName);
100✔
1063

1064
  for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
553✔
1065
    // read pgno & the page from journal
1066
    SPgno pgno;
1067

1068
    int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
452✔
1069
    if (ret < 0) {
456!
1070
      tdbOsFree(pageBuf);
×
1071
      return TAOS_SYSTEM_ERROR(ERRNO);
×
1072
    }
1073

1074
    tdbTrace("pager/restore: restore pgno:%d,", pgno);
456!
1075

1076
    ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
456✔
1077
    if (ret < 0) {
456!
1078
      tdbOsFree(pageBuf);
×
1079
      return TAOS_SYSTEM_ERROR(ERRNO);
×
1080
    }
1081

1082
    i64 offset = pPager->pageSize * (pgno - 1);
456✔
1083
    if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
456!
1084
      tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(ERRNO), pPager->dbFileName, offset);
×
1085
      tdbOsFree(pageBuf);
×
1086
      return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1087
    }
1088

1089
    char *buf = tdbEncryptPage(pPager, pageBuf, pPager->pageSize, __FUNCTION__, offset);
454✔
1090
    if (buf == NULL) {
454!
1091
      return terrno;
×
1092
    }
1093

1094
    ret = tdbOsWrite(pPager->fd, buf, pPager->pageSize);
454✔
1095
    if (ret < 0) {
454!
1096
      tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(ERRNO), pPager->dbFileName,
×
1097
               pPager->pageSize);
1098
      tdbFreeEncryptBuf(pPager, buf);
×
1099
      tdbOsFree(pageBuf);
×
1100
      return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1101
    }
1102

1103
    tdbFreeEncryptBuf(pPager, buf);
454✔
1104
  }
1105

1106
  if (tdbOsFSync(pPager->fd) < 0) {
101!
1107
    tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(ERRNO), pPager->dbFileName);
×
1108
    tdbOsFree(pageBuf);
×
1109
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1110
  }
1111

1112
  tdbOsFree(pageBuf);
100!
1113

1114
  if (tdbOsClose(jfd) < 0) {
100!
1115
    tdbError("failed to close jfd due to %s. jFileName:%s", strerror(ERRNO), pPager->jFileName);
×
1116
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1117
  }
1118

1119
  if (tdbOsRemove(jFileName) < 0 && ERRNO != ENOENT) {
100!
1120
    tdbError("failed to remove file due to %s. jFileName:%s", strerror(ERRNO), pPager->jFileName);
×
1121
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1122
  }
1123

1124
  return 0;
100✔
1125
}
1126

1127
static int32_t txnIdCompareDesc(const void *pLeft, const void *pRight) {
×
1128
  int64_t lhs = *(int64_t *)pLeft;
×
1129
  int64_t rhs = *(int64_t *)pRight;
×
1130
  return lhs > rhs ? -1 : 1;
×
1131
}
1132

1133
int tdbPagerRestoreJournals(SPager *pPager) {
360,388✔
1134
  int32_t        code = 0;
360,388✔
1135
  tdbDirEntryPtr pDirEntry;
1136
  tdbDirPtr      pDir = taosOpenDir(pPager->pEnv->dbName);
360,388✔
1137
  if (pDir == NULL) {
360,391!
1138
    tdbError("failed to open %s since %s", pPager->pEnv->dbName, strerror(ERRNO));
×
1139
    return terrno;
×
1140
  }
1141

1142
  SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));
360,391✔
1143
  if (pTxnList == NULL) {
360,353!
1144
    return terrno;
×
1145
  }
1146

1147
  while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
1,564,293✔
1148
    char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
1,204,003✔
1149
    if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
1,203,923✔
1150
      int64_t txnId = -1;
100✔
1151
      (void)sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
100✔
1152
      if (taosArrayPush(pTxnList, &txnId) == NULL) {
100!
1153
        return terrno;
×
1154
      }
1155
    }
1156
  }
1157
  taosArraySort(pTxnList, txnIdCompareDesc);
360,301✔
1158
  for (int i = 0; i < TARRAY_SIZE(pTxnList); ++i) {
360,432✔
1159
    int64_t *pTxnId = taosArrayGet(pTxnList, i);
100✔
1160
    char     jname[TD_PATH_MAX] = {0};
100✔
1161
    int      dirLen = strlen(pPager->pEnv->dbName);
100✔
1162
    memcpy(jname, pPager->pEnv->dbName, dirLen);
100✔
1163
    jname[dirLen] = '/';
100✔
1164
    (void)tsnprintf(jname + dirLen + 1, TD_PATH_MAX - dirLen - 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
100✔
1165
    code = tdbPagerRestore(pPager, jname);
100✔
1166
    if (code) {
100!
1167
      taosArrayDestroy(pTxnList);
×
1168
      tdbCloseDir(&pDir);
×
1169
      tdbError("failed to restore file due to %s. jFileName:%s", tstrerror(code), jname);
×
1170
      return code;
×
1171
    }
1172
  }
1173

1174
  taosArrayDestroy(pTxnList);
360,332✔
1175
  tdbCloseDir(&pDir);
360,359✔
1176

1177
  return 0;
360,513✔
1178
}
1179

1180
int tdbPagerRollback(SPager *pPager) {
×
1181
  tdbDirEntryPtr pDirEntry;
1182
  tdbDirPtr      pDir = taosOpenDir(pPager->pEnv->dbName);
×
1183
  if (pDir == NULL) {
×
1184
    tdbError("failed to open %s since %s", pPager->pEnv->dbName, strerror(ERRNO));
×
1185
    return terrno;
×
1186
  }
1187

1188
  while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
×
1189
    char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
×
1190

1191
    if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
×
1192
      char jname[TD_PATH_MAX] = {0};
×
1193
      int  dirLen = strlen(pPager->pEnv->dbName);
×
1194
      memcpy(jname, pPager->pEnv->dbName, dirLen);
×
1195
      jname[dirLen] = '/';
×
1196
      memcpy(jname + dirLen + 1, name, strlen(name));
×
1197
      if (tdbOsRemove(jname) < 0 && ERRNO != ENOENT) {
×
1198
        tdbCloseDir(&pDir);
×
1199

1200
        tdbError("failed to remove file due to %s. jFileName:%s", strerror(ERRNO), name);
×
1201
        return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1202
      }
1203
    }
1204
  }
1205

1206
  tdbCloseDir(&pDir);
×
1207

1208
  return 0;
×
1209
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc