• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.54
/source/libs/wal/src/walRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "taoserror.h"
18
#include "wal.h"
19
#include "walInt.h"
20

21
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
430✔
22
  SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
430✔
23
  if (pReader == NULL) {
430✔
24
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
25
    return NULL;
×
26
  }
27

28
  pReader->pWal = pWal;
430✔
29
  pReader->readerId = (id != 0) ? id : tGenIdPI64();
430✔
30
  pReader->pIdxFile = NULL;
430✔
31
  pReader->pLogFile = NULL;
430✔
32
  pReader->curVersion = -1;
430✔
33
  pReader->curFileFirstVer = -1;
430✔
34
  pReader->capacity = 0;
430✔
35
  if (cond) {
430✔
36
    pReader->cond = *cond;
13✔
37
  } else {
38
    //    pReader->cond.scanUncommited = 0;
39
    pReader->cond.scanNotApplied = 0;
417✔
40
    pReader->cond.scanMeta = 0;
417✔
41
    pReader->cond.enableRef = 0;
417✔
42
  }
43

44
  terrno = taosThreadMutexInit(&pReader->mutex, NULL);
430✔
45
  if (terrno) {
430✔
46
    taosMemoryFree(pReader);
×
47
    return NULL;
×
48
  }
49

50
  pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
430✔
51
  if (pReader->pHead == NULL) {
430✔
52
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
53
    taosMemoryFree(pReader);
×
54
    return NULL;
×
55
  }
56

57
  /*if (pReader->cond.enableRef) {*/
58
  /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
59
  /*}*/
60

61
  return pReader;
430✔
62
}
63

64
void walCloseReader(SWalReader *pReader) {
427✔
65
  if (pReader == NULL) return;
427✔
66

67
  TAOS_UNUSED(taosCloseFile(&pReader->pIdxFile));
427✔
68
  TAOS_UNUSED(taosCloseFile(&pReader->pLogFile));
427✔
69
  taosMemoryFreeClear(pReader->pHead);
427✔
70
  taosMemoryFree(pReader);
427✔
71
}
72

73
int32_t walNextValidMsg(SWalReader *pReader) {
345,146✔
74
  int64_t fetchVer = pReader->curVersion;
345,146✔
75
  int64_t lastVer = walGetLastVer(pReader->pWal);
345,146✔
76
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
345,133✔
77
  int64_t appliedVer = walGetAppliedVer(pReader->pWal);
345,133✔
78

79
  wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
345,131✔
80
         ", applied index:%" PRId64,
81
         pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
82
  if (fetchVer > appliedVer) {
345,157✔
83
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
10,543✔
84
  }
85

86
  while (fetchVer <= appliedVer) {
337,591✔
87
    TAOS_CHECK_RETURN(walFetchHead(pReader, fetchVer));
337,322✔
88

89
    int32_t type = pReader->pHead->head.msgType;
337,315✔
90
    if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
337,315✔
91
        (IS_META_MSG(type) && pReader->cond.scanMeta)) {
2,977✔
92
      TAOS_RETURN(walFetchBody(pReader));
334,338✔
93
    } else if (type == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
2,977✔
94
      TAOS_RETURN(walFetchBody(pReader));
×
95
    } else {
96
      TAOS_CHECK_RETURN(walSkipFetchBody(pReader));
2,977✔
97

98
      fetchVer = pReader->curVersion;
2,977✔
99
    }
100
  }
101

102
  TAOS_RETURN(TSDB_CODE_FAILED);
269✔
103
}
104

105
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
2,411✔
106
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
565✔
107
void    walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver) { atomic_store_64(&pReader->skipToVersion, ver); }
1✔
108

109
// this function is NOT multi-thread safe, and no need to be.
110
int64_t walReaderGetSkipToVersion(SWalReader *pReader) {
565✔
111
  int64_t newVersion = pReader->skipToVersion;
565✔
112
  pReader->skipToVersion = 0;
565✔
113
  return newVersion;
565✔
114
}
115

116
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) {
6,951✔
117
  *sver = walGetFirstVer(pReader->pWal);
6,951✔
118
  int64_t lastVer = walGetLastVer(pReader->pWal);
6,951✔
119
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
6,951✔
120
  *ever = pReader->cond.scanUncommited ? lastVer : committedVer;
6,951✔
121
}
6,951✔
122

123
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) {
16,447✔
124
  // if offset version is small than first version , let's seek to first version
125
  TAOS_UNUSED(taosThreadRwlockRdlock(&pWalReader->pWal->mutex));
16,447✔
126
  int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
16,447✔
127
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWalReader->pWal->mutex));
16,447✔
128

129
  if (pOffset->version < firstVer) {
16,447✔
130
    pOffset->version = firstVer;
27✔
131
  }
132
}
16,447✔
133

134
static int32_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
9,151✔
135
  int64_t ret = 0;
9,151✔
136

137
  TdFilePtr pIdxTFile = pReader->pIdxFile;
9,151✔
138
  TdFilePtr pLogTFile = pReader->pLogFile;
9,151✔
139

140
  // seek position
141
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
9,151✔
142
  ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
9,151✔
143
  if (ret < 0) {
9,151✔
144
    wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
×
145
           ver, offset, terrstr());
146

147
    TAOS_RETURN(terrno);
×
148
  }
149
  SWalIdxEntry entry = {0};
9,151✔
150
  if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
9,151✔
151
    if (ret < 0) {
×
152
      wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
×
153
      TAOS_RETURN(terrno);
×
154
    } else {
155
      wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %d", pReader->pWal->cfg.vgId,
×
156
             ret, (int32_t)sizeof(SWalIdxEntry));
157
      TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
158
    }
159
  }
160

161
  ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
9,150✔
162
  if (ret < 0) {
9,151✔
163
    wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
×
164
           ver, entry.offset, terrstr());
165
    TAOS_RETURN(terrno);
×
166
  }
167

168
  TAOS_RETURN(TSDB_CODE_SUCCESS);
9,151✔
169
}
170

171
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
2,258✔
172
  char fnameStr[WAL_FILE_LEN] = {0};
2,258✔
173

174
  TAOS_UNUSED(taosCloseFile(&pReader->pIdxFile));
2,258✔
175
  TAOS_UNUSED(taosCloseFile(&pReader->pLogFile));
2,259✔
176

177
  walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
2,258✔
178
  TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
2,259✔
179
  if (pLogFile == NULL) {
2,259✔
180
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
×
181
    TAOS_RETURN(terrno);
×
182
  }
183

184
  pReader->pLogFile = pLogFile;
2,259✔
185

186
  walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
2,259✔
187
  TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
2,259✔
188
  if (pIdxFile == NULL) {
2,259✔
189
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
×
190
    TAOS_RETURN(terrno);
×
191
  }
192

193
  pReader->pIdxFile = pIdxFile;
2,259✔
194

195
  pReader->curFileFirstVer = fileFirstVer;
2,259✔
196

197
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,259✔
198
}
199

200
static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
9,151✔
201
  SWal *pWal = pReader->pWal;
9,151✔
202

203
  // bsearch in fileSet
204
  SWalFileInfo tmpInfo;
205
  tmpInfo.firstVer = ver;
9,151✔
206
  TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
9,151✔
207
  SWalFileInfo *globalRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
9,151✔
208
  if (globalRet == NULL) {
9,151✔
209
    wError("vgId:%d, failed to find WAL log file with index:%" PRId64, pReader->pWal->cfg.vgId, ver);
×
210
    TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
×
211
    TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
×
212
  }
213
  SWalFileInfo ret;
214
  TAOS_MEMCPY(&ret, globalRet, sizeof(SWalFileInfo));
9,151✔
215
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
9,151✔
216
  if (pReader->curFileFirstVer != ret.firstVer) {
9,151✔
217
    // error code was set inner
218
    TAOS_CHECK_RETURN(walReadChangeFile(pReader, ret.firstVer));
2,259✔
219
  }
220

221
  // error code was set inner
222
  TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, ret.firstVer, ver));
9,151✔
223
  wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver);
9,150✔
224

225
  pReader->curVersion = ver;
9,151✔
226

227
  TAOS_RETURN(TSDB_CODE_SUCCESS);
9,151✔
228
}
229

230
int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
8,912✔
231
  SWal *pWal = pReader->pWal;
8,912✔
232
  if (ver == pReader->curVersion) {
8,912✔
233
    wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
×
234

235
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
236
  }
237

238
  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
8,912✔
239
    wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
8✔
240
          ver, pWal->vers.firstVer, pWal->vers.lastVer);
241

242
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
8✔
243
  }
244

245
  TAOS_CHECK_RETURN(walReadSeekVerImpl(pReader, ver));
8,904✔
246

247
  TAOS_RETURN(TSDB_CODE_SUCCESS);
8,904✔
248
}
249

250
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
337,327✔
251
  int64_t code;
252
  int64_t contLen;
253
  bool    seeked = false;
337,327✔
254

255
  // TODO: valid ver
256
  if (ver > pRead->pWal->vers.commitVer) {
337,327✔
257
    TAOS_RETURN(TSDB_CODE_FAILED);
×
258
  }
259

260
  if (pRead->curVersion != ver) {
337,327✔
261
    TAOS_CHECK_RETURN(walReaderSeekVer(pRead, ver));
1✔
262

263
    seeked = true;
1✔
264
  }
265

266
  while (1) {
267
    contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
337,574✔
268
    if (contLen == sizeof(SWalCkHead)) {
337,592✔
269
      break;
337,348✔
270
    } else if (contLen == 0 && !seeked) {
244✔
271
      TAOS_CHECK_RETURN(walReadSeekVerImpl(pRead, ver));
247✔
272

273
      seeked = true;
247✔
274
      continue;
247✔
275
    } else {
276
      if (contLen < 0) {
×
277
        TAOS_RETURN(terrno);
×
278
      } else {
279
        TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
280
      }
281
    }
282
  }
283

284
  code = walValidHeadCksum(pRead->pHead);
337,348✔
285

286
  if (code != 0) {
337,340✔
287
    wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%" PRIx64,
×
288
           pRead->pWal->cfg.vgId, ver, pRead->readerId);
289

290
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
291
  }
292

293
  TAOS_RETURN(TSDB_CODE_SUCCESS);
337,340✔
294
}
295

296
int32_t walSkipFetchBody(SWalReader *pRead) {
2,980✔
297
  wDebug("vgId:%d, skip:%" PRId64 ", first index:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64
2,980✔
298
         ", applied index:%" PRId64 ", reader:0x%" PRIx64,
299
         pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
300
         pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
301

302
  int32_t plainBodyLen = pRead->pHead->head.bodyLen;
2,980✔
303
  int32_t cryptedBodyLen = plainBodyLen;
2,980✔
304
  // TODO: dmchen emun
305
  if (pRead->pWal->cfg.encryptAlgorithm == 1) {
2,980✔
306
    cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
307
  }
308
  int64_t ret = taosLSeekFile(pRead->pLogFile, cryptedBodyLen, SEEK_CUR);
2,980✔
309
  if (ret < 0) {
2,980✔
310
    TAOS_RETURN(terrno);
×
311
  }
312

313
  pRead->curVersion++;
2,980✔
314

315
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,980✔
316
}
317

318
int32_t walFetchBody(SWalReader *pRead) {
334,331✔
319
  SWalCont *pReadHead = &pRead->pHead->head;
334,331✔
320
  int64_t   ver = pReadHead->version;
334,331✔
321
  int32_t   vgId = pRead->pWal->cfg.vgId;
334,331✔
322
  int64_t   id = pRead->readerId;
334,331✔
323
  SWalVer  *pVer = &pRead->pWal->vers;
334,331✔
324

325
  wDebug("vgId:%d, fetch body:%" PRId64 ", first index:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64
334,331✔
326
         ", applied index:%" PRId64 ", reader:0x%" PRIx64,
327
         vgId, ver, pVer->firstVer, pVer->commitVer, pVer->lastVer, pVer->appliedVer, id);
328

329
  int32_t plainBodyLen = pReadHead->bodyLen;
334,331✔
330
  int32_t cryptedBodyLen = plainBodyLen;
334,331✔
331

332
  // TODO: dmchen emun
333
  if (pRead->pWal->cfg.encryptAlgorithm == 1) {
334,331✔
334
    cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
335
  }
336

337
  if (pRead->capacity < cryptedBodyLen) {
334,331✔
338
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
154✔
339
    if (ptr == NULL) {
154✔
340
      TAOS_RETURN(terrno);
×
341
    }
342
    pRead->pHead = ptr;
154✔
343
    pReadHead = &pRead->pHead->head;
154✔
344
    pRead->capacity = cryptedBodyLen;
154✔
345
  }
346

347
  if (cryptedBodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, cryptedBodyLen)) {
334,331✔
348
    if (plainBodyLen < 0) {
×
349
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, reader:0x%" PRIx64,
×
350
             vgId, pReadHead->version, ver, tstrerror(terrno), id);
351
      TAOS_RETURN(terrno);
×
352
    } else {
353
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64
×
354
             ", since file corrupted, reader:0x%" PRIx64,
355
             vgId, pReadHead->version, ver, id);
356
      TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
357
    }
358
  }
359

360
  if (pReadHead->version != ver) {
334,375✔
361
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64 ", reader:0x%" PRIx64, vgId,
×
362
           pReadHead->version, ver, id);
363
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
364
  }
365

366
  TAOS_CHECK_RETURN(decryptBody(&pRead->pWal->cfg, pRead->pHead, plainBodyLen, __FUNCTION__));
334,375✔
367

368
  if (walValidBodyCksum(pRead->pHead) != 0) {
334,357✔
369
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, reader:0x%" PRIx64, vgId,
×
370
           ver, id);
371

372
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
373
  }
374

375
  pRead->curVersion++;
334,368✔
376

377
  TAOS_RETURN(TSDB_CODE_SUCCESS);
334,368✔
378
}
379

380
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
4,924✔
381
  wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
4,924✔
382
  int64_t contLen;
383
  int32_t code = 0;
4,924✔
384
  bool    seeked = false;
4,924✔
385

386
  if (walIsEmpty(pReader->pWal)) {
4,924✔
387
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
3✔
388
  }
389

390
  if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
4,921✔
391
    wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
1✔
392
           ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
393

394
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
1✔
395
  }
396

397
  if (taosThreadMutexLock(&pReader->mutex) != 0) {
4,920✔
398
    wError("vgId:%d, failed to lock mutex", pReader->pWal->cfg.vgId);
×
399
  }
400

401
  if (pReader->curVersion != ver) {
4,921✔
402
    code = walReaderSeekVer(pReader, ver);
4,904✔
403
    if (code) {
4,904✔
404
      wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
×
405
      TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
406

407
      TAOS_RETURN(code);
×
408
    }
409

410
    seeked = true;
4,904✔
411
  }
412

413
  while (1) {
414
    contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
4,921✔
415
    if (contLen == sizeof(SWalCkHead)) {
4,921✔
416
      break;
4,921✔
417
    } else if (contLen == 0 && !seeked) {
×
418
      code = walReadSeekVerImpl(pReader, ver);
×
419
      if (code) {
×
420
        TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
421

422
        TAOS_RETURN(code);
×
423
      }
424
      seeked = true;
×
425
      continue;
×
426
    } else {
427
      if (contLen < 0) {
×
428
        code = terrno;
×
429
        wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
×
430
               pReader->pWal->cfg.vgId, ver, tstrerror(code));
431
      } else {
432
        code = TSDB_CODE_WAL_FILE_CORRUPTED;
×
433
        wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", not enough bytes read, readLen:%" PRId64
×
434
               ", "
435
               "expectedLen:%d",
436
               pReader->pWal->cfg.vgId, ver, contLen, (int32_t)sizeof(SWalCkHead));
437
      }
438
      TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
439
      TAOS_RETURN(code);
×
440
    }
441
  }
442

443
  code = walValidHeadCksum(pReader->pHead);
4,921✔
444
  if (code != 0) {
4,921✔
445
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
×
446
           ver);
447
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
448

449
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
450
  }
451

452
  int32_t plainBodyLen = pReader->pHead->head.bodyLen;
4,921✔
453
  int32_t cryptedBodyLen = plainBodyLen;
4,921✔
454

455
  // TODO: dmchen emun
456
  if (pReader->pWal->cfg.encryptAlgorithm == 1) {
4,921✔
457
    cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
458
  }
459

460
  if (pReader->capacity < cryptedBodyLen) {
4,921✔
461
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
121✔
462
    if (ptr == NULL) {
121✔
463
      TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
464

465
      TAOS_RETURN(terrno);
×
466
    }
467
    pReader->pHead = ptr;
121✔
468
    pReader->capacity = cryptedBodyLen;
121✔
469
  }
470

471
  if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) {
4,921✔
472
    if (contLen < 0) {
×
473
      code = terrno;
×
474
    } else {
475
      code = TSDB_CODE_WAL_FILE_CORRUPTED;
×
476
    }
477
    wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
×
478
           pReader->pWal->cfg.vgId, ver, tstrerror(code));
479
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
480
    TAOS_RETURN(code);
×
481
  }
482

483
  if (pReader->pHead->head.version != ver) {
4,921✔
484
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
×
485
           pReader->pHead->head.version, ver);
486
    //    pReader->curInvalid = 1;
487
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
488

489
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
490
  }
491

492
  code = decryptBody(&pReader->pWal->cfg, pReader->pHead, plainBodyLen, __FUNCTION__);
4,921✔
493
  if (code) {
4,921✔
494
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
495

496
    TAOS_RETURN(code);
×
497
  }
498

499
  code = walValidBodyCksum(pReader->pHead);
4,921✔
500
  if (code != 0) {
4,921✔
501
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
×
502
           ver);
503
    uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, plainBodyLen);
×
504
    uint32_t logCkSum = pReader->pHead->cksumBody;
×
505
    wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
×
506
    //    pReader->curInvalid = 1;
507

508
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
509

510
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
511
  }
512
  pReader->curVersion++;
4,921✔
513

514
  TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
4,921✔
515

516
  TAOS_RETURN(TSDB_CODE_SUCCESS);
4,921✔
517
}
518

519
int32_t decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func) {
339,861✔
520
  // TODO: dmchen emun
521
  if (cfg->encryptAlgorithm == 1) {
339,861✔
522
    int32_t cryptedBodyLen = ENCRYPTED_LEN(plainBodyLen);
×
523
    char   *newBody = taosMemoryMalloc(cryptedBodyLen);
×
524
    if (!newBody) {
×
525
      TAOS_RETURN(terrno);
×
526
    }
527

528
    SCryptOpts opts;
529
    opts.len = cryptedBodyLen;
×
530
    opts.source = pHead->head.body;
×
531
    opts.result = newBody;
×
532
    opts.unitLen = 16;
×
533
    tstrncpy((char *)opts.key, cfg->encryptKey, sizeof(opts.key));
×
534

535
    int32_t count = CBC_Decrypt(&opts);
×
536

537
    // wDebug("CBC_Decrypt cryptedBodyLen:%d, plainBodyLen:%d, %s", count, plainBodyLen, func);
538

539
    (void)memcpy(pHead->head.body, newBody, plainBodyLen);
×
540

541
    taosMemoryFree(newBody);
×
542
  }
543

544
  TAOS_RETURN(TSDB_CODE_SUCCESS);
339,861✔
545
}
546

547
void walReadReset(SWalReader *pReader) {
1,924✔
548
  if ((taosThreadMutexLock(&pReader->mutex)) != 0) {
1,924✔
549
    wError("vgId:%d, failed to lock mutex", pReader->pWal->cfg.vgId);
×
550
  }
551

552
  TAOS_UNUSED(taosCloseFile(&pReader->pIdxFile));
1,924✔
553
  TAOS_UNUSED(taosCloseFile(&pReader->pLogFile));
1,924✔
554
  pReader->curFileFirstVer = -1;
1,924✔
555
  pReader->curVersion = -1;
1,924✔
556
  TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
1,924✔
557
}
1,924✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc