• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.88
/source/libs/wal/src/walMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "os.h"
18
#include "taoserror.h"
19
#include "tglobal.h"
20
#include "tutil.h"
21
#include "walInt.h"
22

23
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
3✔
24
  return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
9!
25
}
26

27
bool FORCE_INLINE walIsEmpty(SWal* pWal) {
231,179✔
28
  return (pWal->vers.firstVer == -1 || pWal->vers.lastVer < pWal->vers.firstVer);  // [firstVer, lastVer + 1)
231,182!
29
}
30

31
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
186,352✔
32

33
int64_t FORCE_INLINE walGetSnapshotVer(SWal* pWal) { return pWal->vers.snapshotVer; }
2,296,951✔
34

35
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
11,897,021✔
36

37
int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; }
2,810,767✔
38

39
int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; }
519,293✔
40

41
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int64_t metaVer, char* buf) {
42
  return snprintf(buf, WAL_FILE_LEN, "%s%smeta-ver%" PRIi64, pWal->path, TD_DIRSEP, metaVer);
204,622✔
43
}
44

45
static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
46
  return snprintf(buf, WAL_FILE_LEN, "%s%smeta-ver.tmp", pWal->path, TD_DIRSEP);
100,798✔
47
}
48

49
FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer) {
6✔
50
  int32_t       code = 0, lino = 0;
988✔
51
  int32_t       sz = taosArrayGetSize(pWal->fileInfoSet);
6✔
52
  int64_t       retVer = -1;
988✔
53
  void*         ptr = NULL;
988✔
54
  SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
988✔
55

56
  char fnameStr[WAL_FILE_LEN];
57
  walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
988✔
58

59
  int64_t fileSize = 0;
988✔
60
  TAOS_CHECK_GOTO(taosStatFile(fnameStr, &fileSize, NULL, NULL), &lino, _err);
988!
61

62
  TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
988✔
63
  TSDB_CHECK_NULL(pFile, code, lino, _err, terrno);
988!
64

65
  // ensure size as non-negative
66
  pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);
988✔
67

68
  int64_t  stepSize = WAL_SCAN_BUF_SIZE;
988✔
69
  uint64_t magic = WAL_MAGIC;
988✔
70
  int64_t  walCkHeadSz = sizeof(SWalCkHead);
988✔
71
  int64_t  end = fileSize;
988✔
72
  int64_t  capacity = 0;
988✔
73
  int64_t  readSize = 0;
988✔
74
  char*    buf = NULL;
988✔
75
  int64_t  offset = TMIN(pFileInfo->fileSize, fileSize);
988✔
76
  int64_t  lastEntryBeginOffset = 0;
988✔
77
  int64_t  lastEntryEndOffset = 0;
988✔
78
  int64_t  recordLen = 0;
988✔
79
  bool     forwardStage = false;
988✔
80

81
  // check recover size
82
  if (2 * tsWalFsyncDataSizeLimit + offset < end) {
988!
83
    wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e, %" PRId64 " bytes), offset:%" PRId64
×
84
          ", end:%" PRId64 ", file:%s",
85
          pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr);
86
  }
87

88
  // search for the valid last WAL entry, e.g, block by block
89
  while (1) {
×
90
    offset = (lastEntryEndOffset > 0) ? offset : TMAX(0, offset - stepSize + walCkHeadSz - 1);
988!
91
    end = TMIN(offset + stepSize, fileSize);
988✔
92

93
    readSize = end - offset;
988✔
94
    capacity = readSize + sizeof(magic);
988✔
95

96
    ptr = taosMemoryRealloc(buf, capacity);
988!
97
    TSDB_CHECK_NULL(ptr, code, lino, _err, terrno);
988!
98
    buf = ptr;
988✔
99

100
    int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
988✔
101
    if (ret < 0) {
988!
102
      wError("vgId:%d, failed to lseek file since %s, offset:%" PRId64, pWal->cfg.vgId, strerror(ERRNO), offset);
×
103
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
104
    }
105

106
    if (readSize != taosReadFile(pFile, buf, readSize)) {
988!
107
      wError("vgId:%d, failed to read file %s since %s, readSize:%" PRId64, pWal->cfg.vgId, fnameStr, strerror(ERRNO),
×
108
             readSize);
109
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
110
    }
111

112
    char*       candidate = NULL;
988✔
113
    char*       haystack = buf;
988✔
114
    int64_t     pos = 0;
988✔
115
    SWalCkHead* logContent = NULL;
988✔
116

117
    while (true) {
1,304✔
118
      forwardStage = (lastEntryEndOffset > 0 || offset == 0);
2,292!
119
      terrno = TSDB_CODE_SUCCESS;
2,292✔
120
      if (forwardStage) {
2,292!
121
        candidate = (readSize - (haystack - buf)) > 0 ? haystack : NULL;
2,292!
122
      } else {
123
        candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(magic));
×
124
      }
125

126
      if (candidate == NULL) break;
2,292!
127
      pos = candidate - buf;
1,316✔
128

129
      // validate head
130
      int64_t len = readSize - pos;
1,316✔
131
      if (len < walCkHeadSz) {
1,316!
132
        break;
×
133
      }
134

135
      logContent = (SWalCkHead*)(buf + pos);
1,316✔
136
      if (walValidHeadCksum(logContent) != 0) {
1,316✔
137
        code = TSDB_CODE_WAL_CHKSUM_MISMATCH;
6✔
138
        wWarn("vgId:%d, failed to validate checksum of wal entry header, offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
6!
139
              offset + pos, fnameStr);
140
        haystack = buf + pos + 1;
6✔
141
        if (forwardStage) {
6!
142
          break;
6✔
143
        } else {
144
          continue;
×
145
        }
146
      }
147

148
      // validate body
149
      int32_t cryptedBodyLen = logContent->head.bodyLen;
1,310✔
150
      if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
1,310!
151
        cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
152
      }
153
      recordLen = walCkHeadSz + cryptedBodyLen;
1,310✔
154
      if (len < recordLen) {
1,310!
155
        int64_t extraSize = recordLen - len;
×
156
        if (capacity < readSize + extraSize + sizeof(magic)) {
×
157
          capacity += extraSize;
×
158
          void* ptr = taosMemoryRealloc(buf, capacity);
×
159
          TSDB_CHECK_NULL(ptr, code, lino, _err, terrno);
×
160
          buf = ptr;
×
161
        }
162
        int64_t ret = taosLSeekFile(pFile, offset + readSize, SEEK_SET);
×
163
        if (ret < 0) {
×
164
          wError("vgId:%d, failed to lseek file %s since %s, offset:%" PRId64, pWal->cfg.vgId, fnameStr,
×
165
                 strerror(terrno), offset);
166
          code = terrno;
×
167
          break;
×
168
        }
169
        if (extraSize != taosReadFile(pFile, buf + readSize, extraSize)) {
×
170
          wError("vgId:%d, failed to read file %s since %s, offset:%" PRId64 ", extraSize:%" PRId64,
×
171
                 pWal->cfg.vgId, fnameStr, strerror(ERRNO), offset + readSize, extraSize);
172
          code = terrno;
×
173
          break;
×
174
        }
175
      }
176

177
      logContent = (SWalCkHead*)(buf + pos);
1,310✔
178
      TAOS_CHECK_GOTO(decryptBody(&pWal->cfg, logContent, logContent->head.bodyLen, __FUNCTION__), &lino, _err);
1,310!
179

180
      if (walValidBodyCksum(logContent) != 0) {
1,310✔
181
        code = TSDB_CODE_WAL_CHKSUM_MISMATCH;
6✔
182
        wWarn("vgId:%d, failed to validate checksum of wal entry body, offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
6!
183
              offset + pos, fnameStr);
184
        haystack = buf + pos + 1;
6✔
185
        if (forwardStage) {
6!
186
          break;
6✔
187
        } else {
188
          continue;
×
189
        }
190
      }
191

192
      // found one
193
      retVer = taosGetInt64Aligned(&logContent->head.version);
1,304✔
194
      lastEntryBeginOffset = offset + pos;
1,304✔
195
      lastEntryEndOffset = offset + pos + recordLen;
1,304✔
196

197
      // try next
198
      haystack = buf + pos + recordLen;
1,304✔
199
    }
200

201
    offset = (lastEntryEndOffset > 0) ? lastEntryEndOffset : offset;
988!
202
    if (forwardStage && (terrno != TSDB_CODE_SUCCESS || end == fileSize)) break;
988!
203
  }
204

205
  if (retVer < 0) {
988!
206
    code = TSDB_CODE_WAL_LOG_NOT_EXIST;
951✔
207
  }
208

209
  // truncate file
210
  if (lastEntryEndOffset != fileSize) {
988!
211
    if(fileIdx < sz - 1){
12!
212
      wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr,
×
213
            lastEntryEndOffset, fileSize);
214
            
215
      if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
×
216
        wError("vgId:%d, failed to truncate file %s since %s", pWal->cfg.vgId, fnameStr, strerror(terrno));
×
217
        TAOS_CHECK_GOTO(terrno, &lino, _err);
×
218
      } 
219
    }
220
    else{
221
      wWarn("vgId:%d, skip to truncate file in repair meta %s to %" PRId64 ", orig size %" PRId64 " but fileIdx:%d is invalid",
12!
222
            pWal->cfg.vgId, fnameStr, lastEntryEndOffset, fileSize, fileIdx);
223
    }
224

225
    if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) {
12!
226
      wError("failed to fsync file %s since %s", fnameStr, strerror(ERRNO));
×
227
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
228
    }
229
  }
230

231
  pFileInfo->fileSize = lastEntryEndOffset;
988✔
232

233
_err:
988✔
234
  if (code != 0) {
988!
235
    wError("vgId:%d, failed at line %d to scan log file %s since %s", pWal->cfg.vgId, lino, fnameStr, tstrerror(code));
963!
236
  }
237
  TAOS_UNUSED(taosCloseFile(&pFile));
988✔
238
  taosMemoryFree(buf);
988!
239
  *lastVer = retVer;
988✔
240

241
  TAOS_RETURN(code);
988✔
242
}
243

244
static int32_t walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
14,440✔
245
  int metaFileNum = taosArrayGetSize(metaLogList);
14,440✔
246
  int actualFileNum = taosArrayGetSize(actualLogList);
14,440✔
247
  int j = 0;
14,440✔
248

249
  // both of the lists in asc order
250
  for (int i = 0; i < actualFileNum; i++) {
21,117✔
251
    SWalFileInfo* pLogInfo = taosArrayGet(actualLogList, i);
6,677✔
252
    while (j < metaFileNum) {
6,677✔
253
      SWalFileInfo* pMetaInfo = taosArrayGet(metaLogList, j);
6,674✔
254
      if (pMetaInfo->firstVer < pLogInfo->firstVer) {
6,674!
255
        j++;
×
256
      } else if (pMetaInfo->firstVer == pLogInfo->firstVer) {
6,674!
257
        (*pLogInfo) = *pMetaInfo;
6,674✔
258
        j++;
6,674✔
259
        break;
6,674✔
260
      } else {
261
        break;
×
262
      }
263
    }
264
  }
265

266
  taosArrayClear(metaLogList);
14,440✔
267

268
  for (int i = 0; i < actualFileNum; i++) {
21,117✔
269
    SWalFileInfo* pFileInfo = taosArrayGet(actualLogList, i);
6,677✔
270
    if (NULL == taosArrayPush(metaLogList, pFileInfo)) {
6,677!
271
      TAOS_RETURN(terrno);
×
272
    }
273
  }
274

275
  return TSDB_CODE_SUCCESS;
14,440✔
276
}
277

278
static void walAlignVersions(SWal* pWal) {
14,440✔
279
  if (pWal->cfg.committed > 0 && pWal->cfg.committed != pWal->vers.snapshotVer) {
14,440✔
280
    wWarn("vgId:%d, snapshot index:%" PRId64 " in wal is different from commited index:%" PRId64
80!
281
          ", in vnode/mnode, align with it",
282
          pWal->cfg.vgId, pWal->vers.snapshotVer, pWal->cfg.committed);
283
    pWal->vers.snapshotVer = pWal->cfg.committed;
80✔
284
  }
285
  if (pWal->vers.snapshotVer < 0 && pWal->vers.firstVer > 0) {
14,440!
286
    wWarn("vgId:%d, snapshot index:%" PRId64 " in wal is an invalid value, align it with first index:%" PRId64,
×
287
          pWal->cfg.vgId, pWal->vers.snapshotVer, pWal->vers.firstVer);
288
    pWal->vers.snapshotVer = pWal->vers.firstVer;
×
289
  }
290
  if (pWal->vers.firstVer > pWal->vers.snapshotVer + 1) {
14,440!
291
    wWarn("vgId:%d, first index:%" PRId64 " is larger than snapshot index:%" PRId64 " + 1, align with it",
×
292
          pWal->cfg.vgId, pWal->vers.firstVer, pWal->vers.snapshotVer);
293
    pWal->vers.firstVer = pWal->vers.snapshotVer + 1;
×
294
  }
295
  if (pWal->vers.lastVer < pWal->vers.snapshotVer) {
14,440✔
296
    wWarn("vgId:%d, last index:%" PRId64 " is less than snapshot index:%" PRId64 ", align with it", pWal->cfg.vgId,
115!
297
          pWal->vers.lastVer, pWal->vers.snapshotVer);
298
    if (pWal->vers.lastVer < pWal->vers.firstVer) {
115✔
299
      pWal->vers.firstVer = pWal->vers.snapshotVer + 1;
35✔
300
    }
301
    pWal->vers.lastVer = pWal->vers.snapshotVer;
115✔
302
  }
303
  // reset commitVer and appliedVer
304
  pWal->vers.commitVer = pWal->vers.snapshotVer;
14,440✔
305
  pWal->vers.appliedVer = pWal->vers.snapshotVer;
14,440✔
306
  wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer);
14,440✔
307
}
14,440✔
308

309
static int32_t walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
14,440✔
310
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
14,440✔
311
  int32_t fileIdx = -1;
14,440✔
312
  int32_t lastCloseTs = 0;
14,440✔
313
  char    fnameStr[WAL_FILE_LEN] = {0};
14,440✔
314

315
  while (++fileIdx < sz - 1) {
18,134✔
316
    SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
3,694✔
317
    if (pFileInfo->closeTs != -1) {
3,694✔
318
      lastCloseTs = pFileInfo->closeTs;
3,691✔
319
      continue;
3,691✔
320
    }
321

322
    walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
3✔
323
    int64_t mtime = 0;
3✔
324
    if (taosStatFile(fnameStr, NULL, &mtime, NULL) < 0) {
3!
325
      wError("vgId:%d, failed to stat file %s since %s", pWal->cfg.vgId, fnameStr, strerror(ERRNO));
×
326

327
      TAOS_RETURN(terrno);
×
328
    }
329

330
    if (updateMeta != NULL) *updateMeta = true;
3!
331
    if (pFileInfo->createTs == -1) pFileInfo->createTs = lastCloseTs;
3!
332
    pFileInfo->closeTs = mtime;
3✔
333
    lastCloseTs = pFileInfo->closeTs;
3✔
334
  }
335

336
  TAOS_RETURN(TSDB_CODE_SUCCESS);
14,440✔
337
}
338

339
static int32_t walLogEntriesComplete(const SWal* pWal) {
14,440✔
340
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
14,440✔
341
  bool    complete = true;
14,440✔
342
  int32_t fileIdx = -1;
14,440✔
343
  int64_t index = pWal->vers.firstVer;
14,440✔
344

345
  while (++fileIdx < sz) {
21,117✔
346
    SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
6,677✔
347
    if (pFileInfo->firstVer != index) {
6,677!
348
      break;
×
349
    }
350
    index = pFileInfo->lastVer + ((fileIdx + 1 < sz) ? 1 : 0);
6,677✔
351
  }
352
  // empty is regarded as complete
353
  if (sz != 0) {
14,440✔
354
    complete = (index == pWal->vers.lastVer);
2,983✔
355
  }
356

357
  if (!complete) {
14,440!
358
    wError("vgId:%d, WAL log entries incomplete in range [%" PRId64 ", %" PRId64 "], index:%" PRId64
×
359
           ", snaphot index:%" PRId64,
360
           pWal->cfg.vgId, pWal->vers.firstVer, pWal->vers.lastVer, index, pWal->vers.snapshotVer);
361
    TAOS_RETURN(TSDB_CODE_WAL_LOG_INCOMPLETE);
×
362
  } else {
363
    TAOS_RETURN(TSDB_CODE_SUCCESS);
14,440✔
364
  }
365
}
366

367
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
982✔
368
  int32_t   code = TSDB_CODE_SUCCESS;
982✔
369
  int32_t   lino = 0;
982✔
370
  TdFilePtr pFile = NULL;
982✔
371

372
  SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
982✔
373
  TSDB_CHECK_NULL(pFileInfo, code, lino, _exit, terrno);
982!
374

375
  char fnameStr[WAL_FILE_LEN];
376
  walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
982✔
377

378
  int64_t fileSize = 0;
982✔
379
  TAOS_CHECK_EXIT(taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0);
982!
380

381
  int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1);
982✔
382
  int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
982✔
383

384
  if (fileSize <= lastEndOffset) TAOS_RETURN(TSDB_CODE_SUCCESS);
982✔
385

386
  pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
22✔
387
  TSDB_CHECK_NULL(pFile, code, lino, _exit, terrno);
22!
388

389
  wInfo("vgId:%d, trim idx file %s, size:%" PRId64 ", offset:%" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
22!
390
        lastEndOffset);
391

392
  TAOS_CHECK_EXIT(taosFtruncateFile(pFile, lastEndOffset));
22!
393

394
_exit:
22✔
395
  if (code != TSDB_CODE_SUCCESS) {
22!
396
    wError("vgId:%d, failed to trim idx file %s since %s", pWal->cfg.vgId, fnameStr, tstrerror(code));
×
397
  }
398

399
  (void)taosCloseFile(&pFile);
22✔
400
  TAOS_RETURN(code);
22✔
401
}
402

403
static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) {
46,336✔
404
  int32_t sz = taosArrayGetSize(fileSet);
46,336✔
405
  for (int32_t i = 0; i < sz; i++) {
73,041✔
406
    SWalFileInfo* pFileInfo = taosArrayGet(fileSet, i);
26,696✔
407
    wTrace("vgId:%d, %s-%d, first index:%" PRId64 ", last index:%" PRId64 ", fileSize:%" PRId64
26,696✔
408
           ", syncedOffset:%" PRId64 ", createTs:%" PRId64 ", closeTs:%" PRId64,
409
           vgId, str, i, pFileInfo->firstVer, pFileInfo->lastVer, pFileInfo->fileSize, pFileInfo->syncedOffset,
410
           pFileInfo->createTs, pFileInfo->closeTs);
411
  }
412
}
46,345✔
413

414
void walRegfree(regex_t* ptr) {
28,879✔
415
  if (ptr == NULL) {
28,879!
416
    return;
×
417
  }
418
  regfree(ptr);
28,879✔
419
}
420

421
int32_t walCheckAndRepairMeta(SWal* pWal) {
14,438✔
422
  // load log files, get first/snapshot/last version info
423
  int32_t     code = 0;
14,438✔
424
  int32_t     lino = 0;
14,438✔
425
  const char* logPattern = "^[0-9]+.log$";
14,438✔
426
  const char* idxPattern = "^[0-9]+.idx$";
14,438✔
427
  regex_t     logRegPattern, idxRegPattern;
428
  TdDirPtr    pDir = NULL;
14,438✔
429
  SArray*     actualLog = NULL;
14,438✔
430

431
  wInfo("vgId:%d, begin to repair meta, wal path:%s, first index:%" PRId64 ", last index:%" PRId64
14,438✔
432
        ", snapshot index:%" PRId64,
433
        pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
434

435
  TAOS_CHECK_EXIT_SET_CODE(regcomp(&logRegPattern, logPattern, REG_EXTENDED), code, terrno);
14,438!
436

437
  TAOS_CHECK_EXIT_SET_CODE(regcomp(&idxRegPattern, idxPattern, REG_EXTENDED), code, terrno);
14,440!
438

439
  pDir = taosOpenDir(pWal->path);
14,438✔
440
  TSDB_CHECK_NULL(pDir, code, lino, _exit, terrno);
14,438!
441

442
  actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
14,438✔
443

444
  // scan log files and build new meta
445
  TdDirEntryPtr pDirEntry;
446
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
59,675✔
447
    char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry));
45,245✔
448
    int   code = regexec(&logRegPattern, name, 0, NULL, 0);
45,244✔
449
    if (code == 0) {
45,238✔
450
      SWalFileInfo fileInfo;
451
      (void)memset(&fileInfo, -1, sizeof(SWalFileInfo));
6,673✔
452
      (void)sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
6,673✔
453
      TSDB_CHECK_NULL(taosArrayPush(actualLog, &fileInfo), code, lino, _exit, terrno);
6,674!
454
    }
455
  }
456

457
  taosArraySort(actualLog, compareWalFileInfo);
14,433✔
458

459
  wInfo("vgId:%d, actual log file, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
14,437✔
460
        (int32_t)taosArrayGetSize(actualLog));
461
  printFileSet(pWal->cfg.vgId, actualLog, "actual log file");
14,438✔
462

463
  int     metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
14,440✔
464
  int     actualFileNum = taosArrayGetSize(actualLog);
14,440✔
465
  int64_t firstVerPrev = pWal->vers.firstVer;
14,440✔
466
  int64_t lastVerPrev = pWal->vers.lastVer;
14,440✔
467
  int64_t totSize = 0;
14,440✔
468
  bool    updateMeta = (metaFileNum != actualFileNum);
14,440✔
469

470
  // rebuild meta of file info
471
  TAOS_CHECK_EXIT(walRebuildFileInfoSet(pWal->fileInfoSet, actualLog));
14,440!
472

473
  wInfo("vgId:%d, log file in meta, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
14,440✔
474
        (int32_t)taosArrayGetSize(pWal->fileInfoSet));
475
  printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "log file in meta");
14,440✔
476

477
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
14,440✔
478

479
  // scan and determine the lastVer
480
  int32_t fileIdx = sz;
14,440✔
481

482
  while (--fileIdx >= 0) {
21,117✔
483
    char          fnameStr[WAL_FILE_LEN];
484
    int64_t       fileSize = 0;
6,677✔
485
    SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
6,677✔
486

487
    walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
6,677✔
488
    wInfo("vgId:%d, check file %s", pWal->cfg.vgId, fnameStr);
6,677!
489
    TAOS_CHECK_EXIT(taosStatFile(fnameStr, &fileSize, NULL, NULL));
6,677!
490

491
    if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize &&
6,677✔
492
        !(fileIdx == sz - 1 && tsWalForceRepair)) {
5,695!
493
      wInfo("vgId:%d, file %s is valid, fileSize:%" PRId64 ", fileSize in meta:%" PRId64, pWal->cfg.vgId, fnameStr,
5,695!
494
            fileSize, pFileInfo->fileSize);
495
      totSize += pFileInfo->fileSize;
5,695✔
496
      continue;
5,695✔
497
    }
498
    wWarn("vgId:%d, going to repair file %s, fileSize:%" PRId64 ", fileSize in meta:%" PRId64 ", LastVer:%" PRId64
982!
499
          ", firstVer:%" PRId64 ", forceRepair:%d",
500
          pWal->cfg.vgId, fnameStr, fileSize, pFileInfo->fileSize, pFileInfo->lastVer, pFileInfo->firstVer,
501
          tsWalForceRepair);
502
    updateMeta = true;
982✔
503

504
    TAOS_CHECK_EXIT(walTrimIdxFile(pWal, fileIdx));
982!
505

506
    int64_t lastVer = -1;
982✔
507
    code = walScanLogGetLastVer(pWal, fileIdx, &lastVer);
982✔
508
    if (lastVer < 0) {
982✔
509
      if (code != TSDB_CODE_WAL_LOG_NOT_EXIST) {
951!
510
        wError("vgId:%d, failed to scan wal last index since %s", pWal->cfg.vgId, terrstr());
×
511
        goto _exit;;
×
512
      }
513
      // empty log file
514
      lastVer = pFileInfo->firstVer - 1;
951✔
515

516
      code = TSDB_CODE_SUCCESS;
951✔
517
    }
518
    wInfo("vgId:%d, repaired file %s, last index:%" PRId64 ", fileSize:%" PRId64 ", fileSize in meta:%" PRId64,
982!
519
          pWal->cfg.vgId, fnameStr, lastVer, fileSize, pFileInfo->fileSize);
520

521
    // update lastVer
522
    pFileInfo->lastVer = lastVer;
982✔
523
    totSize += pFileInfo->fileSize;
982✔
524
  }
525

526
  // reset vers info and so on
527
  actualFileNum = taosArrayGetSize(pWal->fileInfoSet);
14,440✔
528
  pWal->writeCur = actualFileNum - 1;
14,440✔
529
  pWal->totSize = totSize;
14,440✔
530
  pWal->vers.lastVer = -1;
14,440✔
531
  if (actualFileNum > 0) {
14,440✔
532
    pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
2,983✔
533
    pWal->vers.lastVer = ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer;
2,983✔
534
  }
535
  walAlignVersions(pWal);
14,440✔
536

537
  // repair ts of files
538
  TAOS_CHECK_EXIT(walRepairLogFileTs(pWal, &updateMeta));
14,440!
539

540
  wInfo("vgId:%d, log file after repair, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
14,440✔
541
        (int32_t)taosArrayGetSize(pWal->fileInfoSet));
542
  printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "file after repair");
14,440✔
543
  // update meta file
544
  if (updateMeta) {
14,440✔
545
    TAOS_CHECK_EXIT(walSaveMeta(pWal));
982!
546
  }
547

548
  TAOS_CHECK_EXIT(walLogEntriesComplete(pWal));
14,440!
549

550
  wInfo("vgId:%d, success to repair meta, wal path:%s, first index:%" PRId64 ", last index:%" PRId64
14,440✔
551
        ", snapshot index:%" PRId64,
552
        pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
553
_exit:
84✔
554
  if (code != TSDB_CODE_SUCCESS) {
14,440!
555
    wError("vgId:%d, failed to repair meta since %s, at line:%d", pWal->cfg.vgId, tstrerror(code), lino);
×
556
  }
557
  taosArrayDestroy(actualLog);
14,440✔
558
  TAOS_UNUSED(taosCloseDir(&pDir));
14,440✔
559
  walRegfree(&logRegPattern);
14,440✔
560
  walRegfree(&idxRegPattern);
14,440✔
561

562
  return code;
14,439✔
563
}
564

565
static int32_t walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
1,261✔
566
  if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) return terrno;
1,261!
567

568
  if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) return terrno;
1,261!
569

570
  if (walValidHeadCksum(pCkHead) != 0) return TSDB_CODE_WAL_CHKSUM_MISMATCH;
1,261!
571

572
  return TSDB_CODE_SUCCESS;
1,261✔
573
}
574

575
static int32_t walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
6,661✔
576
  int32_t       code = 0, lino = 0;
6,661✔
577
  int32_t       sz = taosArrayGetSize(pWal->fileInfoSet);
6,661✔
578
  SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
6,661✔
579
  char          fnameStr[WAL_FILE_LEN];
580
  walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
6,661✔
581
  char fLogNameStr[WAL_FILE_LEN];
582
  walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr);
6,662✔
583
  int64_t fileSize = 0;
6,662✔
584

585
  if (taosStatFile(fnameStr, &fileSize, NULL, NULL) < 0 && ERRNO != ENOENT) {
6,662!
586
    wError("vgId:%d, failed to stat file %s since %s", pWal->cfg.vgId, fnameStr, strerror(ERRNO));
×
587

588
    TAOS_RETURN(terrno);
×
589
  }
590

591
  if (fileSize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
6,662✔
592
    TAOS_RETURN(TSDB_CODE_SUCCESS);
6,637✔
593
  }
594

595
  // start to repair
596
  int64_t      offset = fileSize - fileSize % sizeof(SWalIdxEntry);
25✔
597
  TdFilePtr    pLogFile = NULL;
25✔
598
  TdFilePtr    pIdxFile = NULL;
25✔
599
  SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer - 1, .offset = -sizeof(SWalCkHead)};
25✔
600
  SWalCkHead   ckHead;
601
  (void)memset(&ckHead, 0, sizeof(ckHead));
25✔
602
  ckHead.head.version = idxEntry.ver;
25✔
603

604
  pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE);
25✔
605
  if (pIdxFile == NULL) {
25!
606
    wError("vgId:%d, failed to open file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
607
    TAOS_CHECK_GOTO(terrno, &lino, _err);
×
608
  }
609

610
  pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ);
25✔
611
  if (pLogFile == NULL) {
25!
612
    wError("vgId:%d, cannot open file %s since %s", pWal->cfg.vgId, fLogNameStr, terrstr());
×
613
    TAOS_CHECK_GOTO(terrno, &lino, _err);
×
614
  }
615

616
  // determine the last valid entry end, i.e, offset
617
  while ((offset -= sizeof(SWalIdxEntry)) >= 0) {
25✔
618
    if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) {
6!
619
      wError("vgId:%d, failed to seek file %s since %s, offset:%" PRId64, pWal->cfg.vgId, fnameStr, strerror(terrno),
×
620
             offset);
621
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
622
    }
623

624
    if (taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
6!
625
      wError("vgId:%d, failed to read file %s since %s, offset:%" PRId64, pWal->cfg.vgId, fnameStr, strerror(terrno),
×
626
             offset);
627

628
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
629
    }
630

631
    if (idxEntry.ver > pFileInfo->lastVer) {
6!
632
      continue;
×
633
    }
634

635
    if (offset != (idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry)) {
6!
636
      continue;
×
637
    }
638

639
    if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
6!
640
      wWarn("vgId:%d, failed to read log file %s since %s, offset:%" PRId64 ", idx entry index:%" PRId64,
×
641
            pWal->cfg.vgId, fLogNameStr, terrstr(), idxEntry.offset, idxEntry.ver);
642
      continue;
×
643
    }
644

645
    if (idxEntry.ver == ckHead.head.version) {
6!
646
      break;
6✔
647
    }
648
  }
649
  offset += sizeof(SWalIdxEntry);
25✔
650

651
  /*A(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry));*/
652

653
  // ftruncate idx file
654
  if (offset < fileSize) {
25!
655
    if (taosFtruncateFile(pIdxFile, offset) < 0) {
×
656
      wError("vgId:%d, failed to ftruncate file %s since %s, offset:%" PRId64, pWal->cfg.vgId, fnameStr, terrstr(),
×
657
             offset);
658
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
659
    }
660
  }
661

662
  // rebuild idx file
663
  if (taosLSeekFile(pIdxFile, 0, SEEK_END) < 0) {
25!
664
    wError("vgId:%d, failed to seek file %s since %s, offset:%" PRId64, pWal->cfg.vgId, fnameStr, terrstr(), offset);
×
665
    TAOS_CHECK_GOTO(terrno, &lino, _err);
×
666
  }
667

668
  int64_t count = 0;
25✔
669
  while (idxEntry.ver < pFileInfo->lastVer) {
1,280✔
670
    /*A(idxEntry.ver == ckHead.head.version);*/
671

672
    idxEntry.ver += 1;
1,255✔
673

674
    int32_t plainBodyLen = ckHead.head.bodyLen;
1,255✔
675
    int32_t cryptedBodyLen = plainBodyLen;
1,255✔
676
    if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
1,255!
677
      cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
678
    }
679
    idxEntry.offset += sizeof(SWalCkHead) + cryptedBodyLen;
1,255✔
680

681
    code = walReadLogHead(pLogFile, idxEntry.offset, &ckHead);
1,255✔
682
    if (code) {
1,255!
683
      wError("vgId:%d, failed to read wal log head since %s, index:%" PRId64 ", offset:%" PRId64 ", file:%s",
×
684
             pWal->cfg.vgId, tstrerror(code), idxEntry.ver, idxEntry.offset, fLogNameStr);
685
      TAOS_CHECK_GOTO(code, &lino, _err);
×
686
    }
687
    if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
1,255!
688
      wError("vgId:%d, failed to append file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
689
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
690
    }
691
    count++;
1,255✔
692
  }
693

694
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pIdxFile) < 0) {
25!
695
    wError("vgId:%d, faild to fsync file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
696
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(ERRNO), &lino, _err);
×
697
  }
698

699
  if (count > 0) {
25!
700
    wInfo("vgId:%d, rebuilt %" PRId64 " wal idx entries until last index:%" PRId64, pWal->cfg.vgId, count,
25!
701
          pFileInfo->lastVer);
702
  }
703

704
_err:
×
705
  if (code) {
25!
706
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
707
  }
708

709
  (void)taosCloseFile(&pLogFile);
25✔
710
  (void)taosCloseFile(&pIdxFile);
25✔
711

712
  TAOS_RETURN(code);
25✔
713
}
714

715
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
1,316✔
716
  int64_t ver = -1;
1,316✔
717
  int64_t totSize = 0;
1,316✔
718
  if (taosThreadRwlockRdlock(&pWal->mutex) != 0) {
1,316!
719
    wError("vgId:%d failed to lock %p", pWal->cfg.vgId, &pWal->mutex);
×
720
  }
721
  int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet);
1,316✔
722
  while (--fileIdx) {
4,333✔
723
    SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
3,017✔
724
    if (totSize >= bytes) {
3,017!
725
      ver = pInfo->lastVer;
×
726
      break;
×
727
    }
728
    totSize += pInfo->fileSize;
3,017✔
729
  }
730
  if (taosThreadRwlockUnlock(&pWal->mutex) != 0) {
1,316!
731
    wError("vgId:%d failed to lock %p", pWal->cfg.vgId, &pWal->mutex);
×
732
  }
733
  return ver + 1;
1,316✔
734
}
735

736
int32_t walCheckAndRepairIdx(SWal* pWal) {
14,439✔
737
  int32_t code = 0;
14,439✔
738
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
14,439✔
739
  int32_t fileIdx = sz;
14,439✔
740

741
  while (--fileIdx >= 0) {
21,100✔
742
    code = walCheckAndRepairIdxFile(pWal, fileIdx);
6,661✔
743
    if (code) {
6,661!
744
      wError("vgId:%d, failed to repair idx file since %s, fileIdx:%d", pWal->cfg.vgId, terrstr(), fileIdx);
×
745

746
      TAOS_RETURN(code);
×
747
    }
748
  }
749

750
  TAOS_RETURN(code);
14,439✔
751
}
752

753
int32_t walRollFileInfo(SWal* pWal) {
37,016✔
754
  int64_t ts = taosGetTimestampSec();
37,016✔
755

756
  SArray* pArray = pWal->fileInfoSet;
37,015✔
757
  if (taosArrayGetSize(pArray) != 0) {
37,015✔
758
    SWalFileInfo* pInfo = taosArrayGetLast(pArray);
25,622✔
759
    pInfo->lastVer = pWal->vers.lastVer;
25,622✔
760
    pInfo->closeTs = ts;
25,622✔
761
  }
762

763
  // TODO: change to emplace back
764
  SWalFileInfo* pNewInfo = taosMemoryMalloc(sizeof(SWalFileInfo));
37,015!
765
  if (pNewInfo == NULL) {
37,016!
766
    TAOS_RETURN(terrno);
×
767
  }
768
  pNewInfo->firstVer = pWal->vers.lastVer + 1;
37,016✔
769
  pNewInfo->lastVer = -1;
37,016✔
770
  pNewInfo->createTs = ts;
37,016✔
771
  pNewInfo->closeTs = -1;
37,016✔
772
  pNewInfo->fileSize = 0;
37,016✔
773
  pNewInfo->syncedOffset = 0;
37,016✔
774
  if (!taosArrayPush(pArray, pNewInfo)) {
37,014!
775
    taosMemoryFree(pNewInfo);
×
776
    TAOS_RETURN(terrno);
×
777
  }
778

779
  taosMemoryFree(pNewInfo);
37,014!
780
  TAOS_RETURN(TSDB_CODE_SUCCESS);
37,015✔
781
}
782

783
int32_t walMetaSerialize(SWal* pWal, char** serialized) {
100,798✔
784
  char   buf[WAL_JSON_BUF_SIZE];
785
  int    sz = taosArrayGetSize(pWal->fileInfoSet);
100,798✔
786
  cJSON* pRoot = cJSON_CreateObject();
100,802✔
787
  cJSON* pMeta = cJSON_CreateObject();
100,804✔
788
  cJSON* pFiles = cJSON_CreateArray();
100,806✔
789
  cJSON* pField;
790
  if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
100,805!
791
    if (pRoot) {
×
792
      cJSON_Delete(pRoot);
×
793
    }
794
    if (pMeta) {
×
795
      cJSON_Delete(pMeta);
×
796
    }
797
    if (pFiles) {
×
798
      cJSON_Delete(pFiles);
×
799
    }
800

801
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
802
  }
803
  if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) goto _err;
100,805!
804
  snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.firstVer);
100,804✔
805
  if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) goto _err;
100,804!
806
  (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.snapshotVer);
100,804✔
807
  if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) goto _err;
100,804!
808
  (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.commitVer);
100,808✔
809
  if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) goto _err;
100,808!
810
  (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.lastVer);
100,809✔
811
  if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) goto _err;
100,809!
812

813
  if (!cJSON_AddItemToObject(pRoot, "files", pFiles)) goto _err;
100,809!
814
  SWalFileInfo* pData = pWal->fileInfoSet->pData;
100,808✔
815
  for (int i = 0; i < sz; i++) {
16,457,528✔
816
    SWalFileInfo* pInfo = &pData[i];
16,358,520✔
817
    if (!cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject())) {
16,358,520!
818
      wInfo("vgId:%d, failed to add field to files", pWal->cfg.vgId);
×
819
    }
820
    if (pField == NULL) {
16,360,485!
821
      cJSON_Delete(pRoot);
×
822
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
823
    }
824
    // cjson only support int32_t or double
825
    // string are used to prohibit the loss of precision
826
    (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->firstVer);
16,360,485✔
827
    if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) goto _err;
16,360,485!
828
    (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->lastVer);
16,356,056✔
829
    if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) goto _err;
16,356,056!
830
    (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->createTs);
16,356,116✔
831
    if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) goto _err;
16,356,116!
832
    (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->closeTs);
16,356,633✔
833
    if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) goto _err;
16,356,633!
834
    (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->fileSize);
16,338,406✔
835
    if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) goto _err;
16,338,406!
836
  }
837
  char* pSerialized = cJSON_Print(pRoot);
99,008✔
838
  cJSON_Delete(pRoot);
100,807✔
839

840
  *serialized = pSerialized;
100,808✔
841

842
  TAOS_RETURN(TSDB_CODE_SUCCESS);
100,808✔
843
_err:
×
844
  cJSON_Delete(pRoot);
×
845
  return TSDB_CODE_FAILED;
×
846
}
847

848
int32_t walMetaDeserialize(SWal* pWal, const char* bytes) {
3,023✔
849
  /*A(taosArrayGetSize(pWal->fileInfoSet) == 0);*/
850
  cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField;
851
  pRoot = cJSON_Parse(bytes);
3,023✔
852
  if (!pRoot) goto _err;
3,023!
853
  pMeta = cJSON_GetObjectItem(pRoot, "meta");
3,023✔
854
  if (!pMeta) goto _err;
3,025!
855
  pField = cJSON_GetObjectItem(pMeta, "firstVer");
3,025✔
856
  if (!pField) goto _err;
3,019!
857
  pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField));
3,019✔
858
  pField = cJSON_GetObjectItem(pMeta, "snapshotVer");
3,018✔
859
  if (!pField) goto _err;
3,019!
860
  pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField));
3,019✔
861
  pField = cJSON_GetObjectItem(pMeta, "commitVer");
3,024✔
862
  if (!pField) goto _err;
3,023!
863
  pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField));
3,023✔
864
  pField = cJSON_GetObjectItem(pMeta, "lastVer");
3,022✔
865
  if (!pField) goto _err;
3,023!
866
  pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField));
3,023✔
867

868
  pFiles = cJSON_GetObjectItem(pRoot, "files");
3,021✔
869
  int sz = cJSON_GetArraySize(pFiles);
3,020✔
870
  // deserialize
871
  SArray* pArray = pWal->fileInfoSet;
3,020✔
872
  if (taosArrayEnsureCap(pArray, sz)) {
3,020!
873
    cJSON_Delete(pRoot);
×
874
    return terrno;
×
875
  }
876

877
  for (int i = 0; i < sz; i++) {
9,685✔
878
    pInfoJson = cJSON_GetArrayItem(pFiles, i);
6,661✔
879
    if (!pInfoJson) goto _err;
6,657!
880

881
    SWalFileInfo info = {0};
6,657✔
882

883
    pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
6,657✔
884
    if (!pField) goto _err;
6,659!
885
    info.firstVer = atoll(cJSON_GetStringValue(pField));
6,659✔
886
    pField = cJSON_GetObjectItem(pInfoJson, "lastVer");
6,661✔
887
    if (!pField) goto _err;
6,658!
888
    info.lastVer = atoll(cJSON_GetStringValue(pField));
6,658✔
889
    pField = cJSON_GetObjectItem(pInfoJson, "createTs");
6,662✔
890
    if (!pField) goto _err;
6,659!
891
    info.createTs = atoll(cJSON_GetStringValue(pField));
6,659✔
892
    pField = cJSON_GetObjectItem(pInfoJson, "closeTs");
6,659✔
893
    if (!pField) goto _err;
6,661!
894
    info.closeTs = atoll(cJSON_GetStringValue(pField));
6,661✔
895
    pField = cJSON_GetObjectItem(pInfoJson, "fileSize");
6,664✔
896
    if (!pField) goto _err;
6,660!
897
    info.fileSize = atoll(cJSON_GetStringValue(pField));
6,660✔
898
    if (!taosArrayPush(pArray, &info)) {
6,663!
899
      cJSON_Delete(pRoot);
×
900
      return terrno;
×
901
    }
902
  }
903
  pWal->fileInfoSet = pArray;
3,024✔
904
  pWal->writeCur = sz - 1;
3,024✔
905
  cJSON_Delete(pRoot);
3,024✔
906
  return TSDB_CODE_SUCCESS;
3,025✔
907

908
_err:
×
909
  cJSON_Delete(pRoot);
×
910
  return TSDB_CODE_FAILED;
×
911
}
912

913
static int walFindCurMetaVer(SWal* pWal, int64_t* pMetaVer) {
115,337✔
914
  const char* pattern = "^meta-ver[0-9]+$";
115,337✔
915
  regex_t     walMetaRegexPattern;
916
  int         ret = 0;
115,337✔
917

918
  if (pMetaVer) *pMetaVer = -1;
115,337!
919
  if ((ret = regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED)) != 0) {
115,337!
920
    char msgbuf[256] = {0};
×
921
    (void)regerror(ret, &walMetaRegexPattern, msgbuf, tListLen(msgbuf));
×
922
    wError("failed to compile wal meta pattern %s, error %s", pattern, msgbuf);
×
923
    return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
×
924
  }
925

926
  TdDirPtr pDir = taosOpenDir(pWal->path);
115,335✔
927
  if (pDir == NULL) {
115,338✔
928
    wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
3!
929
    regfree(&walMetaRegexPattern);
3✔
930
    return terrno;
×
931
  }
932

933
  TdDirEntryPtr pDirEntry;
934

935
  // find existing meta-ver[x].json
936
  int64_t metaVer = -1;
115,335✔
937
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
28,098,085✔
938
    char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry));
27,992,303✔
939
    int   code = regexec(&walMetaRegexPattern, name, 0, NULL, 0);
27,910,283✔
940
    if (code == 0) {
28,075,146✔
941
      (void)sscanf(name, "meta-ver%" PRIi64, &metaVer);
92,396✔
942
      wDebug("vgId:%d, wal find current meta:%s is the meta file, index:%" PRIi64, pWal->cfg.vgId, name, metaVer);
92,396✔
943
      break;
92,374✔
944
    }
945
    wDebug("vgId:%d, wal find current meta:%s is not meta file", pWal->cfg.vgId, name);
27,982,750✔
946
  }
947
  if (taosCloseDir(&pDir) != 0) {
113,175!
948
    wError("vgId:%d, failed to close dir, ret:%s", pWal->cfg.vgId, tstrerror(terrno));
×
949
    regfree(&walMetaRegexPattern);
×
950
    return terrno;
×
951
  }
952
  regfree(&walMetaRegexPattern);
115,346✔
953
  if (pMetaVer) *pMetaVer = metaVer;
115,312!
954
  return 0;
115,312✔
955
}
956

957
static void walUpdateSyncedOffset(SWal* pWal) {
100,800✔
958
  SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
100,800✔
959
  if (pFileInfo == NULL) return;
100,798✔
960
  pFileInfo->syncedOffset = pFileInfo->fileSize;
100,619✔
961
}
962

963
int32_t walSaveMeta(SWal* pWal) {
100,794✔
964
  int  code = 0, lino = 0;
100,794✔
965
  int64_t  metaVer = -1;
100,794✔
966
  char fnameStr[WAL_FILE_LEN];
967
  char tmpFnameStr[WAL_FILE_LEN];
968
  int       n;
969
  TdFilePtr pMetaFile = NULL;
100,794✔
970
  char*     serialized = NULL;
100,794✔
971

972
  TAOS_CHECK_GOTO(walFindCurMetaVer(pWal, &metaVer), &lino, _err);
100,794!
973
  // fsync the idx and log file at first to ensure validity of meta
974
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pIdxFile) < 0) {
100,778!
975
    wError("vgId:%d, failed to sync idx file since %s", pWal->cfg.vgId, strerror(ERRNO));
×
976
    TAOS_RETURN(TAOS_SYSTEM_ERROR(ERRNO));
×
977
  }
978

979
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pLogFile) < 0) {
100,780!
980
    wError("vgId:%d, failed to sync log file since %s", pWal->cfg.vgId, strerror(ERRNO));
×
981
    TAOS_RETURN(TAOS_SYSTEM_ERROR(ERRNO));
×
982
  }
983

984
  // update synced offset
985
  walUpdateSyncedOffset(pWal);
100,781✔
986

987
  // flush to a tmpfile
988
  n = walBuildTmpMetaName(pWal, tmpFnameStr);
100,798✔
989
  if (n >= sizeof(tmpFnameStr)) {
100,798!
990
    TAOS_RETURN(TAOS_SYSTEM_ERROR(ERRNO));
×
991
  }
992

993
  pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
100,798✔
994
  if (pMetaFile == NULL) {
100,790!
995
    wError("vgId:%d, failed to open file %s since %s", pWal->cfg.vgId, tmpFnameStr, strerror(ERRNO));
×
996

997
    TAOS_RETURN(terrno);
×
998
  }
999

1000
  TAOS_CHECK_RETURN(walMetaSerialize(pWal, &serialized));
100,790!
1001
  int len = strlen(serialized);
100,794✔
1002
  if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) {
100,794!
1003
    wError("vgId:%d, failed to write file %s since %s", pWal->cfg.vgId, tmpFnameStr, strerror(ERRNO));
×
1004
    (void)taosCloseFile(&pMetaFile);
×
1005
    TAOS_CHECK_GOTO(terrno, &lino, _err);
×
1006
  }
1007

1008
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) {
100,790!
1009
    wError("vgId:%d, failed to sync file %s since %s", pWal->cfg.vgId, tmpFnameStr, strerror(ERRNO));
×
1010
    (void)taosCloseFile(&pMetaFile);
×
1011
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(ERRNO), &lino, _err);
×
1012
  }
1013

1014
  if (taosCloseFile(&pMetaFile) < 0) {
100,796!
1015
    wError("vgId:%d, failed to close file %s since %s", pWal->cfg.vgId, tmpFnameStr, strerror(ERRNO));
×
1016
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(ERRNO), &lino, _err);
×
1017
  }
1018
  wInfo("vgId:%d, save meta file %s, first index:%" PRId64 ", last index:%" PRId64, pWal->cfg.vgId, tmpFnameStr,
100,799✔
1019
        pWal->vers.firstVer, pWal->vers.lastVer);
1020

1021
  // rename it
1022
  n = walBuildMetaName(pWal, metaVer + 1, fnameStr);
100,800✔
1023
  if (n >= sizeof(fnameStr)) {
100,800!
1024
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _err);
×
1025
  }
1026

1027
  if (taosRenameFile(tmpFnameStr, fnameStr) < 0) {
100,800!
1028
    wError("failed to rename file from %s to %s since %s", tmpFnameStr, fnameStr, strerror(ERRNO));
×
1029
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(ERRNO), &lino, _err);
×
1030
  }
1031

1032
  // delete old file
1033
  if (metaVer > -1) {
100,800✔
1034
    n = walBuildMetaName(pWal, metaVer, fnameStr);
89,279✔
1035
    if (n >= sizeof(fnameStr)) {
89,279!
1036
      TAOS_RETURN(TAOS_SYSTEM_ERROR(ERRNO));
×
1037
    }
1038
    code = taosRemoveFile(fnameStr);
89,279✔
1039
    if (code) {
89,277!
1040
      wError("vgId:%d, failed to remove file %s since %s", pWal->cfg.vgId, fnameStr, strerror(ERRNO));
×
1041
    } else {
1042
      wInfo("vgId:%d, remove old meta file %s", pWal->cfg.vgId, fnameStr);
89,277✔
1043
    }
1044
  }
1045

1046
  taosMemoryFree(serialized);
100,800!
1047
  return code;
100,800✔
1048

1049
_err:
×
1050
  wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
1051
  taosMemoryFree(serialized);
×
1052
  return code;
×
1053
}
1054
int32_t walLoadMeta(SWal* pWal) {
14,446✔
1055
  int32_t   code = 0;
14,446✔
1056
  int       n = 0;
14,446✔
1057
  int32_t   lino = 0;
14,446✔
1058
  char*     buf = NULL;
14,446✔
1059
  TdFilePtr pFile = NULL;
14,446✔
1060
  int64_t   metaVer = -1;
14,446✔
1061

1062
  // find existing meta file
1063
  TAOS_CHECK_EXIT(walFindCurMetaVer(pWal, &metaVer));
14,446!
1064

1065
  char fnameStr[WAL_FILE_LEN];
1066
  n = walBuildMetaName(pWal, metaVer, fnameStr);
14,446✔
1067
  if (n >= sizeof(fnameStr)) {
14,446!
1068
    TAOS_RETURN(TAOS_SYSTEM_ERROR(ERRNO));
×
1069
  }
1070
  // read metafile
1071
  int64_t fileSize = 0;
14,446✔
1072
  TAOS_CHECK_EXIT(taosStatFile(fnameStr, &fileSize, NULL, NULL));
14,446✔
1073
  if (fileSize == 0) {
3,024!
1074
    code = taosRemoveFile(fnameStr);
×
1075
    if (code) {
×
1076
      wError("vgId:%d, failed to remove file %s since %s", pWal->cfg.vgId, fnameStr, strerror(ERRNO));
×
1077
    } else {
1078
      wInfo("vgId:%d, remove old meta file %s", pWal->cfg.vgId, fnameStr);
×
1079
    }
1080
    wDebug("vgId:%d, wal find empty meta index:%" PRIi64, pWal->cfg.vgId, metaVer);
×
1081

1082
    TAOS_RETURN(TSDB_CODE_FAILED);
×
1083
  }
1084
  int size = (int)fileSize;
3,024✔
1085
  buf = taosMemoryMalloc(size + 5);
3,024!
1086
  TSDB_CHECK_NULL(buf, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
3,024!
1087

1088
  (void)memset(buf, 0, size + 5);
3,024✔
1089
  pFile = taosOpenFile(fnameStr, TD_FILE_READ);
3,024✔
1090
  TSDB_CHECK_NULL(pFile, code, lino, _exit, terrno);
3,024!
1091

1092
  if (taosReadFile(pFile, buf, size) != size) {
3,024!
1093
    code = terrno;
×
1094
    goto _exit;
×
1095
  }
1096

1097
  // load into fileInfoSet
1098
  code = walMetaDeserialize(pWal, buf);
3,025✔
1099
  if (code < 0) {
3,024!
1100
    wError("vgId:%d, failed to deserialize wal meta, file:%s", pWal->cfg.vgId, fnameStr);
×
1101
    code = TSDB_CODE_WAL_FILE_CORRUPTED;
×
1102
  }
1103

1104
  wInfo("vgId:%d, meta file %s loaded, first index:%" PRId64 ", last index:%" PRId64 ", fileInfoSet size:%d",
3,024✔
1105
        pWal->cfg.vgId, fnameStr, pWal->vers.firstVer, pWal->vers.lastVer,
1106
        (int32_t)taosArrayGetSize(pWal->fileInfoSet));
1107
  printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "file in meta");
3,026✔
1108

1109
_exit:
14,448✔
1110
  if (code != TSDB_CODE_SUCCESS) {
14,448✔
1111
    wError("vgId:%d, failed to load meta file since %s, at line:%d", pWal->cfg.vgId, tstrerror(code), lino);
11,421✔
1112
  }
1113

1114
  taosMemoryFree(buf);
14,449!
1115
  (void)taosCloseFile(&pFile);
14,448✔
1116
  TAOS_RETURN(code);
14,449✔
1117
}
1118

1119
int32_t walRemoveMeta(SWal* pWal) {
97✔
1120
  int     code = 0;
97✔
1121
  int64_t metaVer = -1;
97✔
1122

1123
  if ((code = walFindCurMetaVer(pWal, &metaVer))) {
97!
1124
    wError("failed at line %d to find current meta file since %s", __LINE__, tstrerror(code));
×
1125
    TAOS_RETURN(code);
×
1126
  }
1127
  if (metaVer == -1) return 0;
97!
1128
  char fnameStr[WAL_FILE_LEN];
1129
  int  n = walBuildMetaName(pWal, metaVer, fnameStr);
97✔
1130
  if (n >= sizeof(fnameStr)) {
97!
1131
    TAOS_RETURN(TAOS_SYSTEM_ERROR(ERRNO));
×
1132
  }
1133
  return taosRemoveFile(fnameStr);
97✔
1134
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc