• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.44
/source/libs/wal/src/walRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "taoserror.h"
18
#include "wal.h"
19
#include "walInt.h"
20

21
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
29,313✔
22
  SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
29,313✔
23
  if (pReader == NULL) {
29,322!
24
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
25
    return NULL;
×
26
  }
27

28
  pReader->pWal = pWal;
29,323✔
29
  pReader->readerId = (id != 0) ? id : tGenIdPI64();
29,323✔
30
  pReader->pIdxFile = NULL;
29,323✔
31
  pReader->pLogFile = NULL;
29,323✔
32
  pReader->curVersion = -1;
29,323✔
33
  pReader->curFileFirstVer = -1;
29,323✔
34
  pReader->capacity = 0;
29,323✔
35
  if (cond) {
29,323✔
36
    pReader->cond = *cond;
3,924✔
37
  } else {
38
    //    pReader->cond.scanUncommited = 0;
39
    pReader->cond.scanNotApplied = 0;
25,399✔
40
    pReader->cond.scanMeta = 0;
25,399✔
41
    pReader->cond.enableRef = 0;
25,399✔
42
  }
43

44
  terrno = taosThreadMutexInit(&pReader->mutex, NULL);
29,323✔
45
  if (terrno) {
29,321!
46
    taosMemoryFree(pReader);
×
47
    return NULL;
×
48
  }
49

50
  pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
29,323✔
51
  if (pReader->pHead == NULL) {
29,323!
52
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
53
    taosMemoryFree(pReader);
×
54
    return NULL;
×
55
  }
56

57
  /*if (pReader->cond.enableRef) {*/
58
  /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
59
  /*}*/
60

61
  return pReader;
29,323✔
62
}
63

64
void walCloseReader(SWalReader *pReader) {
29,274✔
65
  if (pReader == NULL) return;
29,274✔
66

67
  TAOS_UNUSED(taosCloseFile(&pReader->pIdxFile));
29,195✔
68
  TAOS_UNUSED(taosCloseFile(&pReader->pLogFile));
29,195✔
69
  taosMemoryFreeClear(pReader->pHead);
29,198✔
70
  taosMemoryFree(pReader);
29,201✔
71
}
72

73
int32_t walNextValidMsg(SWalReader *pReader) {
729,875✔
74
  int64_t fetchVer = pReader->curVersion;
729,875✔
75
  int64_t lastVer = walGetLastVer(pReader->pWal);
729,875✔
76
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
729,871✔
77
  int64_t appliedVer = walGetAppliedVer(pReader->pWal);
729,875✔
78

79
  wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64,
729,873✔
80
         pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
81
  if (fetchVer > appliedVer) {
729,876✔
82
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
103,329✔
83
  }
84

85
  while (fetchVer <= appliedVer) {
820,727✔
86
    TAOS_CHECK_RETURN(walFetchHead(pReader, fetchVer));
805,658!
87

88
    int32_t type = pReader->pHead->head.msgType;
805,614✔
89
    if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
805,614✔
90
        (IS_META_MSG(type) && pReader->cond.scanMeta)) {
194,279!
91
      TAOS_RETURN(walFetchBody(pReader));
611,335✔
92
    } else if (type == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
194,279✔
93
      TAOS_RETURN(walFetchBody(pReader));
148✔
94
    } else {
95
      TAOS_CHECK_RETURN(walSkipFetchBody(pReader));
194,131!
96

97
      fetchVer = pReader->curVersion;
194,180✔
98
    }
99
  }
100

101
  TAOS_RETURN(TSDB_CODE_FAILED);
15,069✔
102
}
103

104
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
186,398✔
105
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
54,665✔
106
void    walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver) { atomic_store_64(&pReader->skipToVersion, ver); }
1,000✔
107

108
// this function is NOT multi-thread safe, and no need to be.
109
int64_t walReaderGetSkipToVersion(SWalReader *pReader) {
54,665✔
110
  int64_t newVersion = pReader->skipToVersion;
54,665✔
111
  pReader->skipToVersion = 0;
54,665✔
112
  return newVersion;
54,665✔
113
}
114

115
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) {
62,721✔
116
  *sver = walGetFirstVer(pReader->pWal);
62,721✔
117
  int64_t lastVer = walGetLastVer(pReader->pWal);
62,721✔
118
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
62,722✔
119
  *ever = pReader->cond.scanUncommited ? lastVer : committedVer;
62,722!
120
}
62,722✔
121

122
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) {
82,177✔
123
  // if offset version is small than first version , let's seek to first version
124
  TAOS_UNUSED(taosThreadRwlockRdlock(&pWalReader->pWal->mutex));
82,177✔
125
  int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
82,176✔
126
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWalReader->pWal->mutex));
82,175✔
127

128
  if (pOffset->version < firstVer) {
82,177✔
129
    pOffset->version = firstVer;
48✔
130
  }
131
}
82,177✔
132

133
static int32_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
356,453✔
134
  int64_t ret = 0;
356,453✔
135

136
  TdFilePtr pIdxTFile = pReader->pIdxFile;
356,453✔
137
  TdFilePtr pLogTFile = pReader->pLogFile;
356,453✔
138

139
  // seek position
140
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
356,453✔
141
  ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
356,453✔
142
  if (ret < 0) {
356,729!
143
    wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
×
144
           ver, offset, terrstr());
145

146
    TAOS_RETURN(terrno);
×
147
  }
148
  SWalIdxEntry entry = {0};
356,729✔
149
  if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
356,729!
150
    if (ret < 0) {
×
151
      wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
×
152

153
      TAOS_RETURN(terrno);
×
154
    } else {
155
      wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %ld",
×
156
             pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
157

158
      TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
159
    }
160
  }
161

162
  ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
356,983✔
163
  if (ret < 0) {
356,922!
164
    wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
×
165
           ver, entry.offset, terrstr());
166

167
    TAOS_RETURN(terrno);
×
168
  }
169

170
  TAOS_RETURN(TSDB_CODE_SUCCESS);
356,922✔
171
}
172

173
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
350,877✔
174
  char fnameStr[WAL_FILE_LEN] = {0};
350,877✔
175

176
  TAOS_UNUSED(taosCloseFile(&pReader->pIdxFile));
350,877✔
177
  TAOS_UNUSED(taosCloseFile(&pReader->pLogFile));
350,711✔
178

179
  walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
350,548✔
180
  TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
350,938✔
181
  if (pLogFile == NULL) {
350,697!
182
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
×
183

184
    TAOS_RETURN(terrno);
×
185
  }
186

187
  pReader->pLogFile = pLogFile;
350,697✔
188

189
  walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
350,697✔
190
  TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
350,855✔
191
  if (pIdxFile == NULL) {
350,585!
192
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
×
193

194
    TAOS_RETURN(terrno);
×
195
  }
196

197
  pReader->pIdxFile = pIdxFile;
350,585✔
198

199
  pReader->curFileFirstVer = fileFirstVer;
350,585✔
200

201
  TAOS_RETURN(TSDB_CODE_SUCCESS);
350,585✔
202
}
203

204
static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
356,999✔
205
  SWal *pWal = pReader->pWal;
356,999✔
206

207
  // bsearch in fileSet
208
  SWalFileInfo tmpInfo;
209
  tmpInfo.firstVer = ver;
356,999✔
210
  TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
356,999✔
211
  SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
356,994✔
212
  if (gloablPRet == NULL) {
356,831!
213
    wError("failed to find WAL log file with ver:%" PRId64, ver);
×
214
    TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
×
215
    TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
×
216
  }
217
  SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo));
356,831✔
218
  if (pRet == NULL) {
356,798!
219
    wError("failed to allocate memory for localRet");
×
220
    TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
×
221
    TAOS_RETURN(terrno);
×
222
  }
223
  TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo));
356,798✔
224
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
356,798✔
225
  if (pReader->curFileFirstVer != pRet->firstVer) {
356,900✔
226
    // error code was set inner
227
    TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet);
350,943!
228
  }
229

230
  // error code was set inner
231
  TAOS_CHECK_RETURN_WITH_FREE(walReadSeekFilePos(pReader, pRet->firstVer, ver), pRet);
356,520!
232
  taosMemoryFree(pRet);
356,803✔
233
  wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver);
356,750✔
234

235
  pReader->curVersion = ver;
356,755✔
236

237
  TAOS_RETURN(TSDB_CODE_SUCCESS);
356,755✔
238
}
239

240
int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
356,491✔
241
  SWal *pWal = pReader->pWal;
356,491✔
242
  if (ver == pReader->curVersion) {
356,491✔
243
    wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
5!
244

245
    TAOS_RETURN(TSDB_CODE_SUCCESS);
5✔
246
  }
247

248
  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
356,486!
249
    wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
133!
250
          ver, pWal->vers.firstVer, pWal->vers.lastVer);
251

252
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
133✔
253
  }
254

255
  TAOS_CHECK_RETURN(walReadSeekVerImpl(pReader, ver));
356,353!
256

257
  TAOS_RETURN(TSDB_CODE_SUCCESS);
355,845✔
258
}
259

260
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
945,559✔
261
  int64_t code;
262
  int64_t contLen;
263
  bool    seeked = false;
945,559✔
264

265
  // TODO: valid ver
266
  if (ver > pRead->pWal->vers.commitVer) {
945,559!
267
    TAOS_RETURN(TSDB_CODE_FAILED);
×
268
  }
269

270
  if (pRead->curVersion != ver) {
945,559✔
271
    TAOS_CHECK_RETURN(walReaderSeekVer(pRead, ver));
8,452!
272

273
    seeked = true;
8,453✔
274
  }
275

276
  while (1) {
277
    contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
946,227✔
278
    if (contLen == sizeof(SWalCkHead)) {
946,367✔
279
      break;
945,696✔
280
    } else if (contLen == 0 && !seeked) {
671!
281
      TAOS_CHECK_RETURN(walReadSeekVerImpl(pRead, ver));
667!
282

283
      seeked = true;
667✔
284
      continue;
667✔
285
    } else {
286
      if (contLen < 0) {
4!
287
        TAOS_RETURN(terrno);
×
288
      } else {
289
        TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
4✔
290
      }
291
    }
292
  }
293

294
  code = walValidHeadCksum(pRead->pHead);
945,696✔
295

296
  if (code != 0) {
945,567!
297
    wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%" PRIx64,
×
298
           pRead->pWal->cfg.vgId, ver, pRead->readerId);
299

300
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
301
  }
302

303
  TAOS_RETURN(TSDB_CODE_SUCCESS);
945,567✔
304
}
305

306
int32_t walSkipFetchBody(SWalReader *pRead) {
203,808✔
307
  wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 ", applied:%" PRId64
203,808✔
308
         ", 0x%" PRIx64,
309
         pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
310
         pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
311

312
  int32_t plainBodyLen = pRead->pHead->head.bodyLen;
203,852✔
313
  int32_t cryptedBodyLen = plainBodyLen;
203,852✔
314
  // TODO: dmchen emun
315
  if (pRead->pWal->cfg.encryptAlgorithm == 1) {
203,852!
316
    cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
317
  }
318
  int64_t ret = taosLSeekFile(pRead->pLogFile, cryptedBodyLen, SEEK_CUR);
203,852✔
319
  if (ret < 0) {
203,878!
320
    TAOS_RETURN(terrno);
×
321
  }
322

323
  pRead->curVersion++;
203,878✔
324

325
  TAOS_RETURN(TSDB_CODE_SUCCESS);
203,878✔
326
}
327

328
int32_t walFetchBody(SWalReader *pRead) {
733,581✔
329
  SWalCont *pReadHead = &pRead->pHead->head;
733,581✔
330
  int64_t   ver = pReadHead->version;
733,581✔
331
  int32_t   vgId = pRead->pWal->cfg.vgId;
733,581✔
332
  int64_t   id = pRead->readerId;
733,581✔
333
  SWalVer  *pVer = &pRead->pWal->vers;
733,581✔
334

335
  wDebug("vgId:%d, fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 ", applied:%" PRId64
733,581✔
336
         ", 0x%" PRIx64,
337
         vgId, ver, pVer->firstVer, pVer->commitVer, pVer->lastVer, pVer->appliedVer, id);
338

339
  int32_t plainBodyLen = pReadHead->bodyLen;
733,588✔
340
  int32_t cryptedBodyLen = plainBodyLen;
733,588✔
341

342
  // TODO: dmchen emun
343
  if (pRead->pWal->cfg.encryptAlgorithm == 1) {
733,588!
344
    cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
345
  }
346

347
  if (pRead->capacity < cryptedBodyLen) {
733,588✔
348
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
5,956✔
349
    if (ptr == NULL) {
5,956!
350
      TAOS_RETURN(terrno);
×
351
    }
352
    pRead->pHead = ptr;
5,956✔
353
    pReadHead = &pRead->pHead->head;
5,956✔
354
    pRead->capacity = cryptedBodyLen;
5,956✔
355
  }
356

357
  if (cryptedBodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, cryptedBodyLen)) {
733,588!
358
    if (plainBodyLen < 0) {
×
359
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%" PRIx64, vgId,
×
360
             pReadHead->version, ver, tstrerror(terrno), id);
361

362
      TAOS_RETURN(terrno);
×
363
    } else {
364
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64
×
365
             ", since file corrupted, 0x%" PRIx64,
366
             vgId, pReadHead->version, ver, id);
367

368
      TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
369
    }
370
  }
371

372
  if (pReadHead->version != ver) {
733,649!
373
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64 ", 0x%" PRIx64, vgId,
×
374
           pReadHead->version, ver, id);
375

376
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
377
  }
378

379
  TAOS_CHECK_RETURN(decryptBody(&pRead->pWal->cfg, pRead->pHead, plainBodyLen, __FUNCTION__));
733,649!
380

381
  if (walValidBodyCksum(pRead->pHead) != 0) {
733,608!
382
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver,
×
383
           id);
384

385
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
386
  }
387

388
  pRead->curVersion++;
733,599✔
389

390
  TAOS_RETURN(TSDB_CODE_SUCCESS);
733,599✔
391
}
392

393
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
340,972✔
394
  wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
340,972✔
395
  int64_t contLen;
396
  int32_t code = 0;
340,972✔
397
  bool    seeked = false;
340,972✔
398

399
  if (walIsEmpty(pReader->pWal)) {
340,972!
400
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
×
401
  }
402

403
  if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
340,961!
404
    wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
118!
405
           ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
406

407
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
118✔
408
  }
409

410
  if (taosThreadMutexLock(&pReader->mutex) != 0) {
340,843!
411
    wError("vgId:%d, failed to lock mutex", pReader->pWal->cfg.vgId);
×
412
  }
413

414
  if (pReader->curVersion != ver) {
340,867!
415
    code = walReaderSeekVer(pReader, ver);
340,867✔
416
    if (code) {
340,024!
UNCOV
417
      wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
×
UNCOV
418
      TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
419

420
      TAOS_RETURN(code);
×
421
    }
422

423
    seeked = true;
340,057✔
424
  }
425

426
  while (1) {
427
    contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
340,057✔
428
    if (contLen == sizeof(SWalCkHead)) {
340,842!
429
      break;
340,852✔
430
    } else if (contLen == 0 && !seeked) {
×
431
      code = walReadSeekVerImpl(pReader, ver);
×
432
      if (code) {
×
433
        TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
434

435
        TAOS_RETURN(code);
×
436
      }
437
      seeked = true;
×
438
      continue;
×
439
    } else {
440
      wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
×
441
             pReader->pWal->cfg.vgId, ver, terrstr());
442
      TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
443

444
      if (contLen < 0) {
×
445
        TAOS_RETURN(terrno);
×
446
      } else {
447
        TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
448
      }
449
    }
450
  }
451

452
  code = walValidHeadCksum(pReader->pHead);
340,852✔
453
  if (code != 0) {
340,683!
454
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
×
455
           ver);
456
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
457

458
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
459
  }
460

461
  int32_t plainBodyLen = pReader->pHead->head.bodyLen;
340,683✔
462
  int32_t cryptedBodyLen = plainBodyLen;
340,683✔
463

464
  // TODO: dmchen emun
465
  if (pReader->pWal->cfg.encryptAlgorithm == 1) {
340,683!
466
    cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
467
  }
468

469
  if (pReader->capacity < cryptedBodyLen) {
340,683✔
470
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
3,410✔
471
    if (ptr == NULL) {
3,412!
472
      TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
473

474
      TAOS_RETURN(terrno);
×
475
    }
476
    pReader->pHead = ptr;
3,412✔
477
    pReader->capacity = cryptedBodyLen;
3,412✔
478
  }
479

480
  if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) {
340,685✔
481
    wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
40!
482
           pReader->pWal->cfg.vgId, ver, terrstr());
483
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
40✔
484

485
    if (contLen < 0) {
×
486
      TAOS_RETURN(terrno);
×
487
    } else {
488
      TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
489
    }
490
  }
491

492
  if (pReader->pHead->head.version != ver) {
340,758!
493
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
×
494
           pReader->pHead->head.version, ver);
495
    //    pReader->curInvalid = 1;
496
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
497

498
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
499
  }
500

501
  code = decryptBody(&pReader->pWal->cfg, pReader->pHead, plainBodyLen, __FUNCTION__);
340,758✔
502
  if (code) {
340,611!
503
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
504

505
    TAOS_RETURN(code);
×
506
  }
507

508
  code = walValidBodyCksum(pReader->pHead);
340,611✔
509
  if (code != 0) {
340,626!
510
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
×
511
           ver);
512
    uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, plainBodyLen);
×
513
    uint32_t logCkSum = pReader->pHead->cksumBody;
×
514
    wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
×
515
    //    pReader->curInvalid = 1;
516

517
    TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
×
518

519
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
520
  }
521
  pReader->curVersion++;
340,626✔
522

523
  TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
340,626✔
524

525
  TAOS_RETURN(TSDB_CODE_SUCCESS);
340,778✔
526
}
527

528
int32_t decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func) {
1,074,451✔
529
  // TODO: dmchen emun
530
  if (cfg->encryptAlgorithm == 1) {
1,074,451!
531
    int32_t cryptedBodyLen = ENCRYPTED_LEN(plainBodyLen);
×
532
    char   *newBody = taosMemoryMalloc(cryptedBodyLen);
×
533
    if (!newBody) {
×
534
      TAOS_RETURN(terrno);
×
535
    }
536

537
    SCryptOpts opts;
538
    opts.len = cryptedBodyLen;
×
539
    opts.source = pHead->head.body;
×
540
    opts.result = newBody;
×
541
    opts.unitLen = 16;
×
542
    TAOS_UNUSED(strncpy((char *)opts.key, cfg->encryptKey, 16));
×
543

544
    int32_t count = CBC_Decrypt(&opts);
×
545

546
    // wDebug("CBC_Decrypt cryptedBodyLen:%d, plainBodyLen:%d, %s", count, plainBodyLen, func);
547

548
    (void)memcpy(pHead->head.body, newBody, plainBodyLen);
×
549

550
    taosMemoryFree(newBody);
×
551
  }
552

553
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,074,451✔
554
}
555

556
void walReadReset(SWalReader *pReader) {
340,739✔
557
  if ((taosThreadMutexLock(&pReader->mutex)) != 0) {
340,739!
558
    wError("vgId:%d, failed to lock mutex", pReader->pWal->cfg.vgId);
×
559
  }
560

561
  TAOS_UNUSED(taosCloseFile(&pReader->pIdxFile));
340,875✔
562
  TAOS_UNUSED(taosCloseFile(&pReader->pLogFile));
340,991✔
563
  pReader->curFileFirstVer = -1;
341,008✔
564
  pReader->curVersion = -1;
341,008✔
565
  TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
341,008✔
566
}
341,015✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc