• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.6
/source/libs/wal/src/walMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "os.h"
18
#include "taoserror.h"
19
#include "tglobal.h"
20
#include "tutil.h"
21
#include "walInt.h"
22

23
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
×
24
  return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
×
25
}
26

27
bool FORCE_INLINE walIsEmpty(SWal* pWal) {
392,189✔
28
  return (pWal->vers.firstVer == -1 || pWal->vers.lastVer < pWal->vers.firstVer);  // [firstVer, lastVer + 1)
392,189!
29
}
30

31
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
498,842✔
32

33
int64_t FORCE_INLINE walGetSnapshotVer(SWal* pWal) { return pWal->vers.snapshotVer; }
2,369,831✔
34

35
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
12,953,430✔
36

37
int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; }
3,329,144✔
38

39
int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; }
863,166✔
40

41
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
42
  return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
102,821✔
43
}
44

45
static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
46
  return sprintf(buf, "%s/meta-ver.tmp", pWal->path);
111,571✔
47
}
48

49
static FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer) {
50
  int32_t       code = 0, lino = 0;
860✔
51
  int32_t       sz = taosArrayGetSize(pWal->fileInfoSet);
1,720✔
52
  int64_t       retVer = -1;
860✔
53
  void*         ptr = NULL;
860✔
54
  SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
860✔
55

56
  char fnameStr[WAL_FILE_LEN];
57
  walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
860✔
58

59
  int64_t fileSize = 0;
860✔
60
  if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
860!
61
    wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
62
    code = terrno;
×
63
    goto _err;
×
64
  }
65

66
  TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
860✔
67
  if (pFile == NULL) {
860!
68
    wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
69
    *lastVer = retVer;
×
70
    TAOS_RETURN(terrno);
×
71
  }
72

73
  // ensure size as non-negative
74
  pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);
860✔
75

76
  int64_t  stepSize = WAL_SCAN_BUF_SIZE;
860✔
77
  uint64_t magic = WAL_MAGIC;
860✔
78
  int64_t  walCkHeadSz = sizeof(SWalCkHead);
860✔
79
  int64_t  end = fileSize;
860✔
80
  int64_t  capacity = 0;
860✔
81
  int64_t  readSize = 0;
860✔
82
  char*    buf = NULL;
860✔
83
  int64_t  offset = TMIN(pFileInfo->fileSize, fileSize);
860✔
84
  int64_t  lastEntryBeginOffset = 0;
860✔
85
  int64_t  lastEntryEndOffset = 0;
860✔
86
  int64_t  recordLen = 0;
860✔
87
  bool     forwardStage = false;
860✔
88

89
  // check recover size
90
  if (2 * tsWalFsyncDataSizeLimit + offset < end) {
860!
91
    wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
×
92
          ", end:%" PRId64 ", file:%s",
93
          pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr);
94
  }
95

96
  // search for the valid last WAL entry, e.g. block by block
97
  while (1) {
×
98
    offset = (lastEntryEndOffset > 0) ? offset : TMAX(0, offset - stepSize + walCkHeadSz - 1);
860!
99
    end = TMIN(offset + stepSize, fileSize);
860✔
100

101
    readSize = end - offset;
860✔
102
    capacity = readSize + sizeof(magic);
860✔
103

104
    ptr = taosMemoryRealloc(buf, capacity);
860✔
105
    if (ptr == NULL) {
860!
106
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
107
    }
108
    buf = ptr;
860✔
109

110
    int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
860✔
111
    if (ret < 0) {
860!
112
      wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno), offset);
×
113
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
114
    }
115

116
    if (readSize != taosReadFile(pFile, buf, readSize)) {
860!
117
      wError("vgId:%d, failed to read file due to %s. readSize:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
×
118
             readSize, fnameStr);
119
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
120
    }
121

122
    char*       candidate = NULL;
860✔
123
    char*       haystack = buf;
860✔
124
    int64_t     pos = 0;
860✔
125
    SWalCkHead* logContent = NULL;
860✔
126

127
    while (true) {
110✔
128
      forwardStage = (lastEntryEndOffset > 0 || offset == 0);
970!
129
      terrno = TSDB_CODE_SUCCESS;
970✔
130
      if (forwardStage) {
970!
131
        candidate = (readSize - (haystack - buf)) > 0 ? haystack : NULL;
970✔
132
      } else {
133
        candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(magic));
×
134
      }
135

136
      if (candidate == NULL) break;
970✔
137
      pos = candidate - buf;
110✔
138

139
      // validate head
140
      int64_t len = readSize - pos;
110✔
141
      if (len < walCkHeadSz) {
110!
142
        break;
×
143
      }
144

145
      logContent = (SWalCkHead*)(buf + pos);
110✔
146
      if (walValidHeadCksum(logContent) != 0) {
110!
147
        code = TSDB_CODE_WAL_CHKSUM_MISMATCH;
×
148
        wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
×
149
              offset + pos, fnameStr);
150
        haystack = buf + pos + 1;
×
151
        if (forwardStage) {
×
152
          break;
×
153
        } else {
154
          continue;
×
155
        }
156
      }
157

158
      // validate body
159
      int32_t cryptedBodyLen = logContent->head.bodyLen;
110✔
160
      if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
110!
161
        cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
162
      }
163
      recordLen = walCkHeadSz + cryptedBodyLen;
110✔
164
      if (len < recordLen) {
110!
165
        int64_t extraSize = recordLen - len;
×
166
        if (capacity < readSize + extraSize + sizeof(magic)) {
×
167
          capacity += extraSize;
×
168
          void* ptr = taosMemoryRealloc(buf, capacity);
×
169
          if (ptr == NULL) {
×
170
            TAOS_CHECK_GOTO(terrno, &lino, _err);
×
171
          }
172
          buf = ptr;
×
173
        }
174
        int64_t ret = taosLSeekFile(pFile, offset + readSize, SEEK_SET);
×
175
        if (ret < 0) {
×
176
          wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(terrno),
×
177
                 offset);
178
          code = terrno;
×
179
          break;
×
180
        }
181
        if (extraSize != taosReadFile(pFile, buf + readSize, extraSize)) {
×
182
          wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", extraSize:%" PRId64 ", file:%s",
×
183
                 pWal->cfg.vgId, strerror(errno), offset + readSize, extraSize, fnameStr);
184
          code = terrno;
×
185
          break;
×
186
        }
187
      }
188

189
      logContent = (SWalCkHead*)(buf + pos);
110✔
190
      code = decryptBody(&pWal->cfg, logContent, logContent->head.bodyLen, __FUNCTION__);
110✔
191
      if (code) {
110!
192
        break;
×
193
      }
194

195
      if (walValidBodyCksum(logContent) != 0) {
110!
196
        code = TSDB_CODE_WAL_CHKSUM_MISMATCH;
×
197
        wWarn("vgId:%d, failed to validate checksum of wal entry body. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
×
198
              offset + pos, fnameStr);
199
        haystack = buf + pos + 1;
×
200
        if (forwardStage) {
×
201
          break;
×
202
        } else {
203
          continue;
×
204
        }
205
      }
206

207
      // found one
208
      retVer = logContent->head.version;
110✔
209
      lastEntryBeginOffset = offset + pos;
110✔
210
      lastEntryEndOffset = offset + pos + recordLen;
110✔
211

212
      // try next
213
      haystack = buf + pos + recordLen;
110✔
214
    }
215

216
    offset = (lastEntryEndOffset > 0) ? lastEntryEndOffset : offset;
860✔
217
    if (forwardStage && (terrno != TSDB_CODE_SUCCESS || end == fileSize)) break;
860!
218
  }
219

220
  if (retVer < 0) {
860✔
221
    code = TSDB_CODE_WAL_LOG_NOT_EXIST;
845✔
222
  }
223

224
  // truncate file
225
  if (lastEntryEndOffset != fileSize) {
860!
226
    wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr,
×
227
          lastEntryEndOffset, fileSize);
228

229
    if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
×
230
      wError("failed to truncate file due to %s. file:%s", strerror(terrno), fnameStr);
×
231
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
232
    }
233

234
    if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) {
×
235
      wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
×
236
      TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
×
237
    }
238
  }
239

240
  pFileInfo->fileSize = lastEntryEndOffset;
860✔
241

242
_err:
860✔
243
  taosCloseFile(&pFile);
860✔
244
  taosMemoryFree(buf);
860✔
245
  *lastVer = retVer;
860✔
246

247
  TAOS_RETURN(code);
860✔
248
}
249

250
static int32_t walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
14,220✔
251
  int metaFileNum = taosArrayGetSize(metaLogList);
14,220✔
252
  int actualFileNum = taosArrayGetSize(actualLogList);
14,220✔
253
  int j = 0;
14,220✔
254

255
  // both of the lists in asc order
256
  for (int i = 0; i < actualFileNum; i++) {
20,321✔
257
    SWalFileInfo* pLogInfo = taosArrayGet(actualLogList, i);
6,101✔
258
    while (j < metaFileNum) {
6,103!
259
      SWalFileInfo* pMetaInfo = taosArrayGet(metaLogList, j);
6,103✔
260
      if (pMetaInfo->firstVer < pLogInfo->firstVer) {
6,103✔
261
        j++;
2✔
262
      } else if (pMetaInfo->firstVer == pLogInfo->firstVer) {
6,101!
263
        (*pLogInfo) = *pMetaInfo;
6,101✔
264
        j++;
6,101✔
265
        break;
6,101✔
266
      } else {
267
        break;
×
268
      }
269
    }
270
  }
271

272
  taosArrayClear(metaLogList);
14,220✔
273

274
  for (int i = 0; i < actualFileNum; i++) {
20,321✔
275
    SWalFileInfo* pFileInfo = taosArrayGet(actualLogList, i);
6,101✔
276
    if (NULL == taosArrayPush(metaLogList, pFileInfo)) {
6,101!
277
      TAOS_RETURN(terrno);
×
278
    }
279
  }
280

281
  return TSDB_CODE_SUCCESS;
14,220✔
282
}
283

284
static void walAlignVersions(SWal* pWal) {
14,219✔
285
  if (pWal->cfg.committed > 0 && pWal->cfg.committed != pWal->vers.snapshotVer) {
14,219✔
286
    wWarn("vgId:%d, snapshotVer:%" PRId64 " in wal is different from commited:%" PRId64
79!
287
          ". in vnode/mnode. align with it.",
288
          pWal->cfg.vgId, pWal->vers.snapshotVer, pWal->cfg.committed);
289
    pWal->vers.snapshotVer = pWal->cfg.committed;
79✔
290
  }
291
  if (pWal->vers.snapshotVer < 0 && pWal->vers.firstVer > 0) {
14,219!
292
    wWarn("vgId:%d, snapshotVer:%" PRId64 " in wal is an invalid value. align it with firstVer:%" PRId64 ".",
×
293
          pWal->cfg.vgId, pWal->vers.snapshotVer, pWal->vers.firstVer);
294
    pWal->vers.snapshotVer = pWal->vers.firstVer;
×
295
  }
296
  if (pWal->vers.firstVer > pWal->vers.snapshotVer + 1) {
14,219!
297
    wWarn("vgId:%d, firstVer:%" PRId64 " is larger than snapshotVer:%" PRId64 " + 1. align with it.", pWal->cfg.vgId,
×
298
          pWal->vers.firstVer, pWal->vers.snapshotVer);
299
    pWal->vers.firstVer = pWal->vers.snapshotVer + 1;
×
300
  }
301
  if (pWal->vers.lastVer < pWal->vers.snapshotVer) {
14,219✔
302
    wWarn("vgId:%d, lastVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId,
106!
303
          pWal->vers.lastVer, pWal->vers.snapshotVer);
304
    if (pWal->vers.lastVer < pWal->vers.firstVer) {
106✔
305
      pWal->vers.firstVer = pWal->vers.snapshotVer + 1;
27✔
306
    }
307
    pWal->vers.lastVer = pWal->vers.snapshotVer;
106✔
308
  }
309
  // reset commitVer and appliedVer
310
  pWal->vers.commitVer = pWal->vers.snapshotVer;
14,219✔
311
  pWal->vers.appliedVer = pWal->vers.snapshotVer;
14,219✔
312
  wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer);
14,219✔
313
}
14,221✔
314

315
static int32_t walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
14,220✔
316
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
14,220✔
317
  int32_t fileIdx = -1;
14,220✔
318
  int32_t lastCloseTs = 0;
14,220✔
319
  char    fnameStr[WAL_FILE_LEN] = {0};
14,220✔
320

321
  while (++fileIdx < sz - 1) {
17,698✔
322
    SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
3,478✔
323
    if (pFileInfo->closeTs != -1) {
3,478!
324
      lastCloseTs = pFileInfo->closeTs;
3,478✔
325
      continue;
3,478✔
326
    }
327

328
    walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
×
329
    int32_t mtime = 0;
×
330
    if (taosStatFile(fnameStr, NULL, &mtime, NULL) < 0) {
×
331
      wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
332

333
      TAOS_RETURN(terrno);
×
334
    }
335

336
    if (updateMeta != NULL) *updateMeta = true;
×
337
    if (pFileInfo->createTs == -1) pFileInfo->createTs = lastCloseTs;
×
338
    pFileInfo->closeTs = mtime;
×
339
    lastCloseTs = pFileInfo->closeTs;
×
340
  }
341

342
  TAOS_RETURN(TSDB_CODE_SUCCESS);
14,220✔
343
}
344

345
static int32_t walLogEntriesComplete(const SWal* pWal) {
14,220✔
346
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
14,220✔
347
  bool    complete = true;
14,220✔
348
  int32_t fileIdx = -1;
14,220✔
349
  int64_t index = pWal->vers.firstVer;
14,220✔
350

351
  while (++fileIdx < sz) {
20,321✔
352
    SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
6,101✔
353
    if (pFileInfo->firstVer != index) {
6,101!
354
      break;
×
355
    }
356
    index = pFileInfo->lastVer + ((fileIdx + 1 < sz) ? 1 : 0);
6,101✔
357
  }
358
  // empty is regarded as complete
359
  if (sz != 0) {
14,220✔
360
    complete = (index == pWal->vers.lastVer);
2,623✔
361
  }
362

363
  if (!complete) {
14,220!
364
    wError("vgId:%d, WAL log entries incomplete in range [%" PRId64 ", %" PRId64 "], index:%" PRId64
×
365
           ", snaphotVer:%" PRId64,
366
           pWal->cfg.vgId, pWal->vers.firstVer, pWal->vers.lastVer, index, pWal->vers.snapshotVer);
367
    TAOS_RETURN(TSDB_CODE_WAL_LOG_INCOMPLETE);
×
368
  } else {
369
    TAOS_RETURN(TSDB_CODE_SUCCESS);
14,220✔
370
  }
371
}
372

373
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
860✔
374
  int32_t       code = 0;
860✔
375
  SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
860✔
376
  if (!pFileInfo) {
860!
377
    TAOS_RETURN(TSDB_CODE_FAILED);
×
378
  }
379

380
  char fnameStr[WAL_FILE_LEN];
381
  walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
860✔
382

383
  int64_t fileSize = 0;
860✔
384
  if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
860!
385
    wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
386
    code = terrno;
×
387
    TAOS_RETURN(code);
×
388
  }
389
  int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1);
860✔
390
  int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
860✔
391

392
  if (fileSize <= lastEndOffset) {
860✔
393
    TAOS_RETURN(TSDB_CODE_SUCCESS);
845✔
394
  }
395

396
  TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
15✔
397
  if (pFile == NULL) {
15!
398
    TAOS_RETURN(terrno);
×
399
  }
400

401
  wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
15!
402
        lastEndOffset);
403

404
  code = taosFtruncateFile(pFile, lastEndOffset);
15✔
405
  if (code < 0) {
15!
406
    wError("vgId:%d, failed to truncate file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
407
    TAOS_RETURN(code);
×
408
  }
409
  (void)taosCloseFile(&pFile);
15✔
410

411
  TAOS_RETURN(TSDB_CODE_SUCCESS);
15✔
412
}
413

414
static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) {
45,401✔
415
  int32_t sz = taosArrayGetSize(fileSet);
45,401✔
416
  for (int32_t i = 0; i < sz; i++) {
69,807✔
417
    SWalFileInfo* pFileInfo = taosArrayGet(fileSet, i);
24,406✔
418
    wInfo("vgId:%d, %s-%d, firstVer:%" PRId64 ", lastVer:%" PRId64 ", fileSize:%" PRId64 ", syncedOffset:%" PRId64
24,406!
419
          ", createTs:%" PRId64 ", closeTs:%" PRId64,
420
          vgId, str, i, pFileInfo->firstVer, pFileInfo->lastVer, pFileInfo->fileSize, pFileInfo->syncedOffset,
421
          pFileInfo->createTs, pFileInfo->closeTs);
422
  }
423
}
45,401✔
424

425
int32_t walCheckAndRepairMeta(SWal* pWal) {
14,220✔
426
  // load log files, get first/snapshot/last version info
427
  int32_t     code = 0;
14,220✔
428
  const char* logPattern = "^[0-9]+.log$";
14,220✔
429
  const char* idxPattern = "^[0-9]+.idx$";
14,220✔
430
  regex_t     logRegPattern;
431
  regex_t     idxRegPattern;
432

433
  wInfo("vgId:%d, begin to repair meta, wal path:%s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", snapshotVer:%" PRId64,
14,220!
434
        pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
435

436
  if (regcomp(&logRegPattern, logPattern, REG_EXTENDED) != 0) {
14,220!
437
    wError("failed to compile log pattern, error:%s", tstrerror(terrno));
×
438
    return terrno;
×
439
  }
440
  if (regcomp(&idxRegPattern, idxPattern, REG_EXTENDED) != 0) {
14,219!
441
    wError("failed to compile idx pattern");
×
442
    return terrno;
×
443
  }
444

445
  TdDirPtr pDir = taosOpenDir(pWal->path);
14,219✔
446
  if (pDir == NULL) {
14,217!
447
    regfree(&logRegPattern);
×
448
    regfree(&idxRegPattern);
×
449
    wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
×
450
    return terrno;
×
451
  }
452

453
  SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
14,217✔
454

455
  // scan log files and build new meta
456
  TdDirEntryPtr pDirEntry;
457
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
57,595✔
458
    char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry));
43,369✔
459
    int   code = regexec(&logRegPattern, name, 0, NULL, 0);
43,377✔
460
    if (code == 0) {
43,376✔
461
      SWalFileInfo fileInfo;
462
      (void)memset(&fileInfo, -1, sizeof(SWalFileInfo));
6,100✔
463
      (void)sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
6,100✔
464
      if (!taosArrayPush(actualLog, &fileInfo)) {
6,100!
465
        regfree(&logRegPattern);
×
466
        regfree(&idxRegPattern);
×
467
        int32_t ret = taosCloseDir(&pDir);
×
468
        if (ret != 0) {
×
469
          wError("failed to close dir, ret:%s", tstrerror(ret));
×
470
          return terrno;
×
471
        }
472

473
        return terrno;
×
474
      }
475
    }
476
  }
477

478
  int32_t ret = taosCloseDir(&pDir);
14,216✔
479
  if (ret != 0) {
14,220!
480
    wError("failed to close dir, ret:%s", tstrerror(ret));
×
481
    return terrno;
×
482
  }
483
  regfree(&logRegPattern);
14,220✔
484
  regfree(&idxRegPattern);
14,219✔
485

486
  taosArraySort(actualLog, compareWalFileInfo);
14,211✔
487

488
  wInfo("vgId:%d, actual log file, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
14,212!
489
        (int32_t)taosArrayGetSize(actualLog));
490
  printFileSet(pWal->cfg.vgId, actualLog, "actual log file");
14,217✔
491

492
  int     metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
14,220✔
493
  int     actualFileNum = taosArrayGetSize(actualLog);
14,220✔
494
  int64_t firstVerPrev = pWal->vers.firstVer;
14,220✔
495
  int64_t lastVerPrev = pWal->vers.lastVer;
14,220✔
496
  int64_t totSize = 0;
14,220✔
497
  bool    updateMeta = (metaFileNum != actualFileNum);
14,220✔
498

499
  // rebuild meta of file info
500
  code = walRebuildFileInfoSet(pWal->fileInfoSet, actualLog);
14,220✔
501
  taosArrayDestroy(actualLog);
14,220✔
502
  if (code) {
14,220!
503
    TAOS_RETURN(code);
×
504
  }
505

506
  wInfo("vgId:%d, log file in meta, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
14,220!
507
        (int32_t)taosArrayGetSize(pWal->fileInfoSet));
508
  printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "log file in meta");
14,220✔
509

510
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
14,220✔
511

512
  // scan and determine the lastVer
513
  int32_t fileIdx = sz;
14,220✔
514

515
  while (--fileIdx >= 0) {
20,319✔
516
    char          fnameStr[WAL_FILE_LEN];
517
    int64_t       fileSize = 0;
6,100✔
518
    SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
6,100✔
519

520
    walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
6,100✔
521
    int32_t code = taosStatFile(fnameStr, &fileSize, NULL, NULL);
6,101✔
522
    if (code < 0) {
6,100!
523
      wError("failed to stat file since %s. file:%s", terrstr(), fnameStr);
×
524

525
      TAOS_RETURN(terrno);
×
526
    }
527

528
    if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) {
6,100✔
529
      totSize += pFileInfo->fileSize;
5,239✔
530
      continue;
5,239✔
531
    }
532
    updateMeta = true;
861✔
533

534
    TAOS_CHECK_RETURN(walTrimIdxFile(pWal, fileIdx));
861!
535

536
    int64_t lastVer = -1;
860✔
537
    code = walScanLogGetLastVer(pWal, fileIdx, &lastVer);
860✔
538
    if (lastVer < 0) {
860✔
539
      if (code != TSDB_CODE_WAL_LOG_NOT_EXIST) {
845!
540
        wError("failed to scan wal last ver since %s", terrstr());
×
541

542
        TAOS_RETURN(code);
×
543
      }
544
      // empty log file
545
      lastVer = pFileInfo->firstVer - 1;
845✔
546

547
      code = TSDB_CODE_SUCCESS;
845✔
548
    }
549

550
    // update lastVer
551
    pFileInfo->lastVer = lastVer;
860✔
552
    totSize += pFileInfo->fileSize;
860✔
553
  }
554

555
  // reset vers info and so on
556
  actualFileNum = taosArrayGetSize(pWal->fileInfoSet);
14,219✔
557
  pWal->writeCur = actualFileNum - 1;
14,219✔
558
  pWal->totSize = totSize;
14,219✔
559
  pWal->vers.lastVer = -1;
14,219✔
560
  if (actualFileNum > 0) {
14,219✔
561
    pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
2,622✔
562
    pWal->vers.lastVer = ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer;
2,622✔
563
  }
564
  walAlignVersions(pWal);
14,219✔
565

566
  // repair ts of files
567
  TAOS_CHECK_RETURN(walRepairLogFileTs(pWal, &updateMeta));
14,220!
568

569
  wInfo("vgId:%d, log file after repair, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
14,220!
570
        (int32_t)taosArrayGetSize(pWal->fileInfoSet));
571
  printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "file after repair");
14,220✔
572
  // update meta file
573
  if (updateMeta) {
14,220✔
574
    TAOS_CHECK_RETURN(walSaveMeta(pWal));
860!
575
  }
576

577
  TAOS_CHECK_RETURN(walLogEntriesComplete(pWal));
14,220!
578

579
  wInfo("vgId:%d, success to repair meta, wal path:%s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", snapshotVer:%" PRId64,
14,220!
580
        pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
581

582
  return code;
14,220✔
583
}
584

585
static int32_t walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
51✔
586
  if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) {
51!
587
    TAOS_RETURN(terrno);
×
588
  }
589

590
  if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
51!
591
    TAOS_RETURN(terrno);
×
592
  }
593

594
  if (walValidHeadCksum(pCkHead) != 0) {
51!
595
    TAOS_RETURN(TSDB_CODE_WAL_CHKSUM_MISMATCH);
×
596
  }
597

598
  return TSDB_CODE_SUCCESS;
51✔
599
}
600

601
static int32_t walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
6,101✔
602
  int32_t       code = 0, lino = 0;
6,101✔
603
  int32_t       sz = taosArrayGetSize(pWal->fileInfoSet);
6,101✔
604
  SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
6,101✔
605
  char          fnameStr[WAL_FILE_LEN];
606
  walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
6,101✔
607
  char fLogNameStr[WAL_FILE_LEN];
608
  walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr);
6,101✔
609
  int64_t fileSize = 0;
6,101✔
610

611
  if (taosStatFile(fnameStr, &fileSize, NULL, NULL) < 0 && errno != ENOENT) {
6,101!
612
    wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
613

614
    TAOS_RETURN(terrno);
×
615
  }
616

617
  if (fileSize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
6,101✔
618
    TAOS_RETURN(TSDB_CODE_SUCCESS);
6,086✔
619
  }
620

621
  // start to repair
622
  int64_t      offset = fileSize - fileSize % sizeof(SWalIdxEntry);
15✔
623
  TdFilePtr    pLogFile = NULL;
15✔
624
  TdFilePtr    pIdxFile = NULL;
15✔
625
  SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer - 1, .offset = -sizeof(SWalCkHead)};
15✔
626
  SWalCkHead   ckHead;
627
  (void)memset(&ckHead, 0, sizeof(ckHead));
15✔
628
  ckHead.head.version = idxEntry.ver;
15✔
629

630
  pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE);
15✔
631
  if (pIdxFile == NULL) {
15!
632
    wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
633

634
    TAOS_CHECK_GOTO(terrno, &lino, _err);
×
635
  }
636

637
  pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ);
15✔
638
  if (pLogFile == NULL) {
15!
639
    wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr());
×
640

641
    TAOS_CHECK_GOTO(terrno, &lino, _err);
×
642
  }
643

644
  // determine the last valid entry end, i.e. offset
645
  while ((offset -= sizeof(SWalIdxEntry)) >= 0) {
15✔
646
    if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) {
6!
647
      wError("vgId:%d, failed to seek file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(terrno),
×
648
             offset, fnameStr);
649

650
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
651
    }
652

653
    if (taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
6!
654
      wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(terrno),
×
655
             offset, fnameStr);
656

657
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
658
    }
659

660
    if (idxEntry.ver > pFileInfo->lastVer) {
6!
661
      continue;
×
662
    }
663

664
    if (offset != (idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry)) {
6!
665
      continue;
×
666
    }
667

668
    if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
6!
669
      wWarn("vgId:%d, failed to read log file since %s. file:%s, offset:%" PRId64 ", idx entry ver:%" PRId64 "",
×
670
            pWal->cfg.vgId, terrstr(), fLogNameStr, idxEntry.offset, idxEntry.ver);
671
      continue;
×
672
    }
673

674
    if (idxEntry.ver == ckHead.head.version) {
6!
675
      break;
6✔
676
    }
677
  }
678
  offset += sizeof(SWalIdxEntry);
15✔
679

680
  /*A(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry));*/
681

682
  // ftruncate idx file
683
  if (offset < fileSize) {
15!
684
    if (taosFtruncateFile(pIdxFile, offset) < 0) {
×
685
      wError("vgId:%d, failed to ftruncate file since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(),
×
686
             offset, fnameStr);
687

688
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
689
    }
690
  }
691

692
  // rebuild idx file
693
  if (taosLSeekFile(pIdxFile, 0, SEEK_END) < 0) {
15!
694
    wError("vgId:%d, failed to seek file since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(), offset,
×
695
           fnameStr);
696

697
    TAOS_CHECK_GOTO(terrno, &lino, _err);
×
698
  }
699

700
  int64_t count = 0;
15✔
701
  while (idxEntry.ver < pFileInfo->lastVer) {
60✔
702
    /*A(idxEntry.ver == ckHead.head.version);*/
703

704
    idxEntry.ver += 1;
45✔
705

706
    int32_t plainBodyLen = ckHead.head.bodyLen;
45✔
707
    int32_t cryptedBodyLen = plainBodyLen;
45✔
708
    if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
45!
709
      cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
×
710
    }
711
    idxEntry.offset += sizeof(SWalCkHead) + cryptedBodyLen;
45✔
712

713
    code = walReadLogHead(pLogFile, idxEntry.offset, &ckHead);
45✔
714
    if (code) {
45!
715
      wError("vgId:%d, failed to read wal log head since %s. index:%" PRId64 ", offset:%" PRId64 ", file:%s",
×
716
             pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr);
717

718
      TAOS_CHECK_GOTO(code, &lino, _err);
×
719
    }
720
    if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
45!
721
      wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
×
722

723
      TAOS_CHECK_GOTO(terrno, &lino, _err);
×
724
    }
725
    count++;
45✔
726
  }
727

728
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pIdxFile) < 0) {
15!
729
    wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
×
730

731
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
×
732
  }
733

734
  if (count > 0) {
15!
735
    wInfo("vgId:%d, rebuilt %" PRId64 " wal idx entries until lastVer: %" PRId64, pWal->cfg.vgId, count,
15!
736
          pFileInfo->lastVer);
737
  }
738

739
_err:
×
740
  if (code) {
15!
741
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
742
  }
743

744
  (void)taosCloseFile(&pLogFile);
15✔
745
  (void)taosCloseFile(&pIdxFile);
15✔
746

747
  TAOS_RETURN(code);
15✔
748
}
749

750
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
1,689✔
751
  int64_t ver = -1;
1,689✔
752
  int64_t totSize = 0;
1,689✔
753
  if (taosThreadRwlockRdlock(&pWal->mutex) != 0) {
1,689!
754
    wError("vgId:%d failed to lock %p", pWal->cfg.vgId, &pWal->mutex);
×
755
  }
756
  int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet);
1,689✔
757
  while (--fileIdx) {
7,982✔
758
    SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
6,293✔
759
    if (totSize >= bytes) {
6,293!
760
      ver = pInfo->lastVer;
×
761
      break;
×
762
    }
763
    totSize += pInfo->fileSize;
6,293✔
764
  }
765
  if (taosThreadRwlockUnlock(&pWal->mutex) != 0) {
1,689!
766
    wError("vgId:%d failed to lock %p", pWal->cfg.vgId, &pWal->mutex);
×
767
  }
768
  return ver + 1;
1,689✔
769
}
770

771
int32_t walCheckAndRepairIdx(SWal* pWal) {
14,220✔
772
  int32_t code = 0;
14,220✔
773
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
14,220✔
774
  int32_t fileIdx = sz;
14,220✔
775

776
  while (--fileIdx >= 0) {
20,321✔
777
    code = walCheckAndRepairIdxFile(pWal, fileIdx);
6,101✔
778
    if (code) {
6,101!
779
      wError("vgId:%d, failed to repair idx file since %s. fileIdx:%d", pWal->cfg.vgId, terrstr(), fileIdx);
×
780

781
      TAOS_RETURN(code);
×
782
    }
783
  }
784

785
  TAOS_RETURN(code);
14,220✔
786
}
787

788
int32_t walRollFileInfo(SWal* pWal) {
40,793✔
789
  int64_t ts = taosGetTimestampSec();
40,793✔
790

791
  SArray* pArray = pWal->fileInfoSet;
40,794✔
792
  if (taosArrayGetSize(pArray) != 0) {
40,794✔
793
    SWalFileInfo* pInfo = taosArrayGetLast(pArray);
29,317✔
794
    pInfo->lastVer = pWal->vers.lastVer;
29,317✔
795
    pInfo->closeTs = ts;
29,317✔
796
  }
797

798
  // TODO: change to emplace back
799
  SWalFileInfo* pNewInfo = taosMemoryMalloc(sizeof(SWalFileInfo));
40,794✔
800
  if (pNewInfo == NULL) {
40,793!
801
    TAOS_RETURN(terrno);
×
802
  }
803
  pNewInfo->firstVer = pWal->vers.lastVer + 1;
40,793✔
804
  pNewInfo->lastVer = -1;
40,793✔
805
  pNewInfo->createTs = ts;
40,793✔
806
  pNewInfo->closeTs = -1;
40,793✔
807
  pNewInfo->fileSize = 0;
40,793✔
808
  pNewInfo->syncedOffset = 0;
40,793✔
809
  if (!taosArrayPush(pArray, pNewInfo)) {
40,793!
810
    taosMemoryFree(pNewInfo);
×
811
    TAOS_RETURN(terrno);
×
812
  }
813

814
  taosMemoryFree(pNewInfo);
40,793✔
815
  TAOS_RETURN(TSDB_CODE_SUCCESS);
40,793✔
816
}
817

818
int32_t walMetaSerialize(SWal* pWal, char** serialized) {
111,562✔
819
  char   buf[30];
820
  int    sz = taosArrayGetSize(pWal->fileInfoSet);
111,562✔
821
  cJSON* pRoot = cJSON_CreateObject();
111,566✔
822
  cJSON* pMeta = cJSON_CreateObject();
111,567✔
823
  cJSON* pFiles = cJSON_CreateArray();
111,568✔
824
  cJSON* pField;
825
  if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
111,570!
826
    if (pRoot) {
×
827
      cJSON_Delete(pRoot);
×
828
    }
829
    if (pMeta) {
×
830
      cJSON_Delete(pMeta);
×
831
    }
832
    if (pFiles) {
×
833
      cJSON_Delete(pFiles);
×
834
    }
835

836
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
837
  }
838
  if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) {
111,570!
839
    wInfo("vgId:%d, failed to add meta to root", pWal->cfg.vgId);
×
840
  }
841
  (void)sprintf(buf, "%" PRId64, pWal->vers.firstVer);
111,563✔
842
  if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) {
111,563!
843
    wInfo("vgId:%d, failed to add firstVer to meta", pWal->cfg.vgId);
×
844
  }
845
  (void)sprintf(buf, "%" PRId64, pWal->vers.snapshotVer);
111,564✔
846
  if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) {
111,564!
847
    wInfo("vgId:%d, failed to add snapshotVer to meta", pWal->cfg.vgId);
×
848
  }
849
  (void)sprintf(buf, "%" PRId64, pWal->vers.commitVer);
111,568✔
850
  if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) {
111,568!
851
    wInfo("vgId:%d, failed to add commitVer to meta", pWal->cfg.vgId);
×
852
  }
853
  (void)sprintf(buf, "%" PRId64, pWal->vers.lastVer);
111,570✔
854
  if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) {
111,570!
855
    wInfo("vgId:%d, failed to add lastVer to meta", pWal->cfg.vgId);
×
856
  }
857

858
  if (!cJSON_AddItemToObject(pRoot, "files", pFiles)) {
111,570!
859
    wInfo("vgId:%d, failed to add files to root", pWal->cfg.vgId);
×
860
  }
861
  SWalFileInfo* pData = pWal->fileInfoSet->pData;
111,570✔
862
  for (int i = 0; i < sz; i++) {
20,733,746✔
863
    SWalFileInfo* pInfo = &pData[i];
20,622,054✔
864
    if (!cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject())) {
20,622,054!
865
      wInfo("vgId:%d, failed to add field to files", pWal->cfg.vgId);
×
866
    }
867
    if (pField == NULL) {
20,586,184!
868
      cJSON_Delete(pRoot);
×
869

870
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
871
    }
872
    // cjson only support int32_t or double
873
    // string are used to prohibit the loss of precision
874
    (void)sprintf(buf, "%" PRId64, pInfo->firstVer);
20,586,184✔
875
    if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) {
20,586,184!
876
      wInfo("vgId:%d, failed to add firstVer to field", pWal->cfg.vgId);
×
877
    }
878
    (void)sprintf(buf, "%" PRId64, pInfo->lastVer);
20,604,121✔
879
    if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) {
20,604,121!
880
      wInfo("vgId:%d, failed to add lastVer to field", pWal->cfg.vgId);
×
881
    }
882
    (void)sprintf(buf, "%" PRId64, pInfo->createTs);
20,582,633✔
883
    if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) {
20,582,633!
884
      wInfo("vgId:%d, failed to add createTs to field", pWal->cfg.vgId);
×
885
    }
886
    (void)sprintf(buf, "%" PRId64, pInfo->closeTs);
20,600,016✔
887
    if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) {
20,600,016!
888
      wInfo("vgId:%d, failed to add closeTs to field", pWal->cfg.vgId);
×
889
    }
890
    (void)sprintf(buf, "%" PRId64, pInfo->fileSize);
20,618,498✔
891
    if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) {
20,618,498!
892
      wInfo("vgId:%d, failed to add fileSize to field", pWal->cfg.vgId);
×
893
    }
894
  }
895
  char* pSerialized = cJSON_Print(pRoot);
111,692✔
896
  cJSON_Delete(pRoot);
111,569✔
897

898
  *serialized = pSerialized;
111,568✔
899

900
  TAOS_RETURN(TSDB_CODE_SUCCESS);
111,568✔
901
}
902

903
int32_t walMetaDeserialize(SWal* pWal, const char* bytes) {
2,740✔
904
  /*A(taosArrayGetSize(pWal->fileInfoSet) == 0);*/
905
  cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField;
906
  pRoot = cJSON_Parse(bytes);
2,740✔
907
  if (!pRoot) goto _err;
2,740!
908
  pMeta = cJSON_GetObjectItem(pRoot, "meta");
2,740✔
909
  if (!pMeta) goto _err;
2,741!
910
  pField = cJSON_GetObjectItem(pMeta, "firstVer");
2,741✔
911
  if (!pField) goto _err;
2,741!
912
  pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField));
2,741✔
913
  pField = cJSON_GetObjectItem(pMeta, "snapshotVer");
2,741✔
914
  if (!pField) goto _err;
2,741!
915
  pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField));
2,741✔
916
  pField = cJSON_GetObjectItem(pMeta, "commitVer");
2,741✔
917
  if (!pField) goto _err;
2,741!
918
  pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField));
2,741✔
919
  pField = cJSON_GetObjectItem(pMeta, "lastVer");
2,741✔
920
  if (!pField) goto _err;
2,741!
921
  pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField));
2,741✔
922

923
  pFiles = cJSON_GetObjectItem(pRoot, "files");
2,741✔
924
  int sz = cJSON_GetArraySize(pFiles);
2,741✔
925
  // deserialize
926
  SArray* pArray = pWal->fileInfoSet;
2,741✔
927
  if (taosArrayEnsureCap(pArray, sz)) {
2,741!
928
    cJSON_Delete(pRoot);
×
929
    return terrno;
×
930
  }
931

932
  for (int i = 0; i < sz; i++) {
8,841✔
933
    pInfoJson = cJSON_GetArrayItem(pFiles, i);
6,100✔
934
    if (!pInfoJson) goto _err;
6,095!
935

936
    SWalFileInfo info = {0};
6,095✔
937

938
    pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
6,095✔
939
    if (!pField) goto _err;
6,101!
940
    info.firstVer = atoll(cJSON_GetStringValue(pField));
6,101✔
941
    pField = cJSON_GetObjectItem(pInfoJson, "lastVer");
6,101✔
942
    if (!pField) goto _err;
6,102!
943
    info.lastVer = atoll(cJSON_GetStringValue(pField));
6,102✔
944
    pField = cJSON_GetObjectItem(pInfoJson, "createTs");
6,102✔
945
    if (!pField) goto _err;
6,099!
946
    info.createTs = atoll(cJSON_GetStringValue(pField));
6,099✔
947
    pField = cJSON_GetObjectItem(pInfoJson, "closeTs");
6,099✔
948
    if (!pField) goto _err;
6,099!
949
    info.closeTs = atoll(cJSON_GetStringValue(pField));
6,099✔
950
    pField = cJSON_GetObjectItem(pInfoJson, "fileSize");
6,099✔
951
    if (!pField) goto _err;
6,097!
952
    info.fileSize = atoll(cJSON_GetStringValue(pField));
6,097✔
953
    if (!taosArrayPush(pArray, &info)) {
6,100!
954
      cJSON_Delete(pRoot);
×
955
      return terrno;
×
956
    }
957
  }
958
  pWal->fileInfoSet = pArray;
2,741✔
959
  pWal->writeCur = sz - 1;
2,741✔
960
  cJSON_Delete(pRoot);
2,741✔
961
  return TSDB_CODE_SUCCESS;
2,741✔
962

963
_err:
×
964
  cJSON_Delete(pRoot);
×
965
  return TSDB_CODE_FAILED;
×
966
}
967

968
static int walFindCurMetaVer(SWal* pWal) {
125,876✔
969
  const char* pattern = "^meta-ver[0-9]+$";
125,876✔
970
  regex_t     walMetaRegexPattern;
971
  if (regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED) != 0) {
125,876!
972
    wError("failed to compile wal meta pattern, error %s", tstrerror(terrno));
×
973
    return terrno;
×
974
  }
975

976
  TdDirPtr pDir = taosOpenDir(pWal->path);
125,833✔
977
  if (pDir == NULL) {
125,873✔
978
    wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
5!
979
    regfree(&walMetaRegexPattern);
5✔
980
    return terrno;
×
981
  }
982

983
  TdDirEntryPtr pDirEntry;
984

985
  // find existing meta-ver[x].json
986
  int metaVer = -1;
125,868✔
987
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
35,567,005✔
988
    char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry));
35,424,133✔
989
    int   code = regexec(&walMetaRegexPattern, name, 0, NULL, 0);
35,334,049✔
990
    if (code == 0) {
35,543,703✔
991
      (void)sscanf(name, "meta-ver%d", &metaVer);
102,822✔
992
      wDebug("vgId:%d, wal find current meta: %s is the meta file, ver %d", pWal->cfg.vgId, name, metaVer);
102,822✔
993
      break;
102,816✔
994
    }
995
    wDebug("vgId:%d, wal find current meta: %s is not meta file", pWal->cfg.vgId, name);
35,440,881✔
996
  }
997
  if (taosCloseDir(&pDir) != 0) {
121,466!
998
    wError("failed to close dir, ret:%s", tstrerror(terrno));
×
999
    regfree(&walMetaRegexPattern);
×
1000
    return terrno;
×
1001
  }
1002
  regfree(&walMetaRegexPattern);
125,883✔
1003
  return metaVer;
125,865✔
1004
}
1005

1006
static void walUpdateSyncedOffset(SWal* pWal) {
111,568✔
1007
  SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
111,568✔
1008
  if (pFileInfo == NULL) return;
111,571✔
1009
  pFileInfo->syncedOffset = pFileInfo->fileSize;
111,366✔
1010
}
1011

1012
int32_t walSaveMeta(SWal* pWal) {
111,571✔
1013
  int  code = 0, lino = 0;
111,571✔
1014
  int  metaVer = walFindCurMetaVer(pWal);
111,571✔
1015
  char fnameStr[WAL_FILE_LEN];
1016
  char tmpFnameStr[WAL_FILE_LEN];
1017
  int  n;
1018

1019
  // fsync the idx and log file at first to ensure validity of meta
1020
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pIdxFile) < 0) {
111,555!
1021
    wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno));
×
1022

1023
    TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
×
1024
  }
1025

1026
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pLogFile) < 0) {
111,553!
1027
    wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno));
×
1028

1029
    TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
×
1030
  }
1031

1032
  // update synced offset
1033
  walUpdateSyncedOffset(pWal);
111,552✔
1034

1035
  // flush to a tmpfile
1036
  n = walBuildTmpMetaName(pWal, tmpFnameStr);
111,571✔
1037
  if (n >= sizeof(tmpFnameStr)) {
111,571!
1038
    TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
×
1039
  }
1040

1041
  TdFilePtr pMetaFile =
111,566✔
1042
      taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
111,571✔
1043
  if (pMetaFile == NULL) {
111,566!
1044
    wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
×
1045

1046
    TAOS_RETURN(terrno);
×
1047
  }
1048

1049
  char* serialized = NULL;
111,566✔
1050
  TAOS_CHECK_RETURN(walMetaSerialize(pWal, &serialized));
111,566!
1051
  int len = strlen(serialized);
111,565✔
1052
  if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) {
111,565!
1053
    wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
×
1054

1055
    TAOS_CHECK_GOTO(terrno, &lino, _err);
×
1056
  }
1057

1058
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) {
111,561!
1059
    wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
×
1060

1061
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
×
1062
  }
1063

1064
  if (taosCloseFile(&pMetaFile) < 0) {
111,567!
1065
    wError("vgId:%d, failed to close file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
×
1066

1067
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
×
1068
  }
1069
  wInfo("vgId:%d, save meta file: %s, firstVer:%" PRId64 ", lastVer:%" PRId64, pWal->cfg.vgId, tmpFnameStr,
111,571!
1070
        pWal->vers.firstVer, pWal->vers.lastVer);
1071

1072
  // rename it
1073
  n = walBuildMetaName(pWal, metaVer + 1, fnameStr);
111,571✔
1074
  if (n >= sizeof(fnameStr)) {
111,571!
1075
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _err);
×
1076
  }
1077

1078
  if (taosRenameFile(tmpFnameStr, fnameStr) < 0) {
111,571!
1079
    wError("failed to rename file due to %s. dest:%s", strerror(errno), fnameStr);
×
1080

1081
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
×
1082
  }
1083

1084
  // delete old file
1085
  if (metaVer > -1) {
111,568✔
1086
    n = walBuildMetaName(pWal, metaVer, fnameStr);
99,996✔
1087
    if (n >= sizeof(fnameStr)) {
99,996!
1088
      TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
×
1089
    }
1090
    code = taosRemoveFile(fnameStr);
99,996✔
1091
    if (code) {
99,996!
1092
      wError("vgId:%d, failed to remove file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
1093
    } else {
1094
      wInfo("vgId:%d, remove old meta file: %s", pWal->cfg.vgId, fnameStr);
99,996✔
1095
    }
1096
  }
1097

1098
  taosMemoryFree(serialized);
111,573✔
1099
  return code;
111,571✔
1100

1101
_err:
×
1102
  wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
1103
  (void)taosCloseFile(&pMetaFile);
×
1104
  taosMemoryFree(serialized);
×
1105
  return code;
×
1106
}
1107

1108
int32_t walLoadMeta(SWal* pWal) {
14,207✔
1109
  int32_t code = 0;
14,207✔
1110
  int     n = 0;
14,207✔
1111
  // find existing meta file
1112
  int metaVer = walFindCurMetaVer(pWal);
14,207✔
1113
  if (metaVer == -1) {
14,224✔
1114
    wDebug("vgId:%d, wal find meta ver %d", pWal->cfg.vgId, metaVer);
11,483✔
1115

1116
    TAOS_RETURN(TSDB_CODE_FAILED);
11,483✔
1117
  }
1118
  char fnameStr[WAL_FILE_LEN];
1119
  n = walBuildMetaName(pWal, metaVer, fnameStr);
2,741✔
1120
  if (n >= sizeof(fnameStr)) {
2,741!
1121
    TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
×
1122
  }
1123
  // read metafile
1124
  int64_t fileSize = 0;
2,741✔
1125
  if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
2,741!
UNCOV
1126
    wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
UNCOV
1127
    code = terrno;
×
1128
    TAOS_RETURN(code);
×
1129
  }
1130
  if (fileSize == 0) {
2,741!
1131
    code = taosRemoveFile(fnameStr);
×
1132
    if (code) {
×
1133
      wError("vgId:%d, failed to remove file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
×
1134
    } else {
1135
      wInfo("vgId:%d, remove old meta file: %s", pWal->cfg.vgId, fnameStr);
×
1136
    }
1137
    wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
×
1138

1139
    TAOS_RETURN(TSDB_CODE_FAILED);
×
1140
  }
1141
  int   size = (int)fileSize;
2,741✔
1142
  char* buf = taosMemoryMalloc(size + 5);
2,741✔
1143
  if (buf == NULL) {
2,741!
1144
    TAOS_RETURN(terrno);
×
1145
  }
1146
  (void)memset(buf, 0, size + 5);
2,741✔
1147
  TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
2,741✔
1148
  if (pFile == NULL) {
2,740!
1149
    taosMemoryFree(buf);
×
1150

1151
    TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
×
1152
  }
1153
  if (taosReadFile(pFile, buf, size) != size) {
2,740!
1154
    (void)taosCloseFile(&pFile);
×
1155
    taosMemoryFree(buf);
×
1156

1157
    TAOS_RETURN(terrno);
×
1158
  }
1159
  // load into fileInfoSet
1160
  code = walMetaDeserialize(pWal, buf);
2,740✔
1161
  if (code < 0) {
2,741!
1162
    wError("failed to deserialize wal meta. file:%s", fnameStr);
×
1163
    code = TSDB_CODE_WAL_FILE_CORRUPTED;
×
1164
  }
1165
  (void)taosCloseFile(&pFile);
2,741✔
1166
  taosMemoryFree(buf);
2,741✔
1167

1168
  wInfo("vgId:%d, meta file loaded: %s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", fileInfoSet size:%d", pWal->cfg.vgId,
2,741!
1169
        fnameStr, pWal->vers.firstVer, pWal->vers.lastVer, (int32_t)taosArrayGetSize(pWal->fileInfoSet));
1170
  printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "file in meta");
2,741✔
1171

1172
  TAOS_RETURN(code);
2,741✔
1173
}
1174

1175
int32_t walRemoveMeta(SWal* pWal) {
85✔
1176
  int metaVer = walFindCurMetaVer(pWal);
85✔
1177
  if (metaVer == -1) return 0;
85✔
1178
  char fnameStr[WAL_FILE_LEN];
1179
  int  n = walBuildMetaName(pWal, metaVer, fnameStr);
84✔
1180
  if (n >= sizeof(fnameStr)) {
84!
1181
    TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
×
1182
  }
1183
  return taosRemoveFile(fnameStr);
84✔
1184
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc