• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5071

17 May 2026 01:15AM UTC coverage: 63.054% (-10.3%) from 73.326%
#5071

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

238317 of 377957 relevant lines covered (63.05%)

130539817.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.42
/source/dnode/vnode/src/tsdb/tsdbRepair.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dmRepair.h"
17
#include "tsdbDataFileRAW.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbFS2.h"
20
#include "tsdbSttFileRW.h"
21

22
bool tsdbShouldForceRepair(STFileSystem *fs) {
1,035,828✔
23
  int32_t vgId = TD_VID(fs->tsdb->pVnode);
1,035,828✔
24

25
  if (!dmRepairFlowEnabled()) {
1,037,328✔
26
    return false;
1,036,546✔
27
  }
28

29
  if (!dmRepairNodeTypeIsVnode()) {
×
30
    return false;
×
31
  }
32

33
  if (!dmRepairModeIsForce()) {
×
34
    return false;
×
35
  }
36

37
  if (!dmRepairNeedTsdbRepair(vgId)) {
×
38
    return false;
×
39
  }
40

41
  return true;
×
42
}
43

44
typedef enum {
45
  TSDB_REPAIR_ACTION_KEEP = 0,
46
  TSDB_REPAIR_ACTION_DROP = 1,
47
  TSDB_REPAIR_ACTION_REBUILD = 2,
48
} ETsdbRepairAction;
49

50
typedef enum {
51
  TSDB_REPAIR_MODE_DROP_INVALID_ONLY = 0,
52
  TSDB_REPAIR_MODE_HEAD_ONLY_REBUILD = 1,
53
  TSDB_REPAIR_MODE_FULL_REBUILD = 2,
54
} ETsdbRepairMode;
55

56
typedef TARRAY2(SBrinRecord) TBrinRecordArray;
57

58
enum {
59
  TSDB_REPAIR_HEAD_OP_REMOVE_HEAD = 1 << 0,
60
  TSDB_REPAIR_HEAD_OP_CREATE_HEAD = 1 << 1,
61
  TSDB_REPAIR_HEAD_OP_REMOVE_DATA = 1 << 2,
62
  TSDB_REPAIR_HEAD_OP_CREATE_DATA = 1 << 3,
63
  TSDB_REPAIR_HEAD_OP_REMOVE_SMA = 1 << 4,
64
  TSDB_REPAIR_HEAD_OP_CREATE_SMA = 1 << 5,
65
};
66

67
typedef struct {
68
  ETsdbRepairAction action;
69
  int32_t           keptBlocks;
70
  int32_t           droppedBlocks;
71
  const char       *reason;
72
} STsdbRepairCoreResult;
73

74
typedef struct {
75
  ETsdbRepairAction action;
76
  bool              rewriteRequired;
77
  int32_t           keptDataBlocks;
78
  int32_t           droppedDataBlocks;
79
  int32_t           keptTombBlocks;
80
  int32_t           droppedTombBlocks;
81
  const char       *reason;
82
} STsdbRepairSttResult;
83

84
const char *tsdbRepairStrategyName(EDmRepairStrategy strategy) {
×
85
  switch (strategy) {
×
86
    case DM_REPAIR_STRATEGY_TSDB_DROP_INVALID_ONLY:
×
87
      return "drop_invalid_only";
×
88
    case DM_REPAIR_STRATEGY_TSDB_HEAD_ONLY_REBUILD:
×
89
      return "head_only_rebuild";
×
90
    case DM_REPAIR_STRATEGY_TSDB_FULL_REBUILD:
×
91
      return "full_rebuild";
×
92
    default:
×
93
      return "unknown";
×
94
  }
95
}
96

97
EDmRepairStrategy tsdbRepairNormalizeStrategy(EDmRepairStrategy strategy) {
×
98
  return strategy == DM_REPAIR_STRATEGY_NONE ? DM_REPAIR_STRATEGY_TSDB_DROP_INVALID_ONLY : strategy;
×
99
}
100

101
int32_t tsdbRepairResolveMode(EDmRepairStrategy strategy) {
×
102
  switch (tsdbRepairNormalizeStrategy(strategy)) {
×
103
    case DM_REPAIR_STRATEGY_TSDB_HEAD_ONLY_REBUILD:
×
104
      return TSDB_REPAIR_MODE_HEAD_ONLY_REBUILD;
×
105
    case DM_REPAIR_STRATEGY_TSDB_FULL_REBUILD:
×
106
      return TSDB_REPAIR_MODE_FULL_REBUILD;
×
107
    case DM_REPAIR_STRATEGY_TSDB_DROP_INVALID_ONLY:
×
108
    default:
109
      return TSDB_REPAIR_MODE_DROP_INVALID_ONLY;
×
110
  }
111
}
112

113
void tsdbRepairBuildHeadOnlyBrinRecord(const SBrinRecord *src, bool keepSma, SBrinRecord *dst) {
×
114
  if (src == NULL || dst == NULL) {
×
115
    return;
×
116
  }
117

118
  *dst = *src;
×
119
  if (!keepSma) {
×
120
    dst->smaOffset = 0;
×
121
    dst->smaSize = 0;
×
122
  }
123
}
124

125
int32_t tsdbRepairDescribeHeadOnlyOps(bool hasHead, bool hasSma, bool dropSma) {
×
126
  int32_t opMask = TSDB_REPAIR_HEAD_OP_CREATE_HEAD;
×
127

128
  if (hasHead) {
×
129
    opMask |= TSDB_REPAIR_HEAD_OP_REMOVE_HEAD;
×
130
  }
131
  if (hasSma && dropSma) {
×
132
    opMask |= TSDB_REPAIR_HEAD_OP_REMOVE_SMA;
×
133
  }
134

135
  return opMask;
×
136
}
137

138
bool tsdbRepairDataBlockLooksValid(const SBlockData *blockData, const SBrinRecord *record) {
×
139
  if (blockData == NULL || record == NULL) {
×
140
    return false;
×
141
  }
142

143
  if (blockData->nRow <= 0) {
×
144
    return false;
×
145
  }
146

147
  if (blockData->suid != record->suid || blockData->uid != record->uid) {
×
148
    return false;
×
149
  }
150

151
  if (blockData->aTSKEY == NULL || blockData->aVersion == NULL) {
×
152
    return false;
×
153
  }
154

155
  return true;
×
156
}
157

158
bool tsdbRepairSttBlockLooksValid(const SBlockData *blockData) {
×
159
  if (blockData == NULL) {
×
160
    return false;
×
161
  }
162

163
  if (blockData->nRow <= 0) {
×
164
    return false;
×
165
  }
166

167
  if (blockData->aTSKEY == NULL || blockData->aVersion == NULL) {
×
168
    return false;
×
169
  }
170

171
  if (blockData->uid == 0 && blockData->aUid == NULL) {
×
172
    return false;
×
173
  }
174

175
  return true;
×
176
}
177

178
int32_t tsdbRepairResolveCoreAction(int32_t keptBlocks, int32_t droppedBlocks) {
×
179
  if (droppedBlocks <= 0) {
×
180
    return TSDB_REPAIR_ACTION_KEEP;
×
181
  }
182

183
  return keptBlocks > 0 ? TSDB_REPAIR_ACTION_REBUILD : TSDB_REPAIR_ACTION_DROP;
×
184
}
185

186
int32_t tsdbRepairResolveSttAction(int32_t keptDataBlocks, int32_t keptTombBlocks, int32_t droppedDataBlocks,
×
187
                                   int32_t droppedTombBlocks) {
188
  if (droppedDataBlocks <= 0 && droppedTombBlocks <= 0) {
×
189
    return TSDB_REPAIR_ACTION_KEEP;
×
190
  }
191

192
  return (keptDataBlocks > 0 || keptTombBlocks > 0) ? TSDB_REPAIR_ACTION_REBUILD : TSDB_REPAIR_ACTION_DROP;
×
193
}
194

195
bool tsdbRepairShouldAbortCoreWriterClose(int32_t rebuiltBlocks) { return rebuiltBlocks <= 0; }
×
196

197
bool tsdbRepairShouldAbortSttWriterClose(int32_t rebuiltDataBlocks, int32_t rebuiltTombBlocks) {
×
198
  return rebuiltDataBlocks <= 0 && rebuiltTombBlocks <= 0;
×
199
}
200

201
int32_t tsdbRepairResolveHeadOnlyBrinFlushThreshold(int32_t maxRows) { return maxRows > 0 ? maxRows : 1; }
×
202

203
bool tsdbRepairShouldFlushHeadOnlyBrinBlock(int32_t numOfRecords, int32_t maxRows) {
×
204
  return numOfRecords >= tsdbRepairResolveHeadOnlyBrinFlushThreshold(maxRows);
×
205
}
206

207
bool tsdbRepairShouldRetryHeadOnlyBrinPut(int32_t putCode, int32_t numOfRecords, int32_t maxRows) {
×
208
  return putCode == TSDB_CODE_INVALID_PARA && tsdbRepairShouldFlushHeadOnlyBrinBlock(numOfRecords, maxRows);
×
209
}
210

211
static const char *tsdbRepairActionName(ETsdbRepairAction action) {
×
212
  switch (action) {
×
213
    case TSDB_REPAIR_ACTION_KEEP:
×
214
      return "keep";
×
215
    case TSDB_REPAIR_ACTION_DROP:
×
216
      return "drop";
×
217
    case TSDB_REPAIR_ACTION_REBUILD:
×
218
      return "rebuild";
×
219
    default:
×
220
      return "unknown";
×
221
  }
222
}
223

224
static EDmRepairStrategy tsdbRepairGetStrategyForFileSet(int32_t vgId, int32_t fid) {
×
225
  const SRepairTsdbFileOpt *pOpt = dmRepairGetTsdbFileOpt(vgId, fid);
×
226

227
  return tsdbRepairNormalizeStrategy(pOpt == NULL ? DM_REPAIR_STRATEGY_NONE : pOpt->strategy);
×
228
}
229

230
bool tsdbRepairShouldProcessFileSet(int32_t vnodeId, int32_t fid) {
×
231
  return dmRepairGetTsdbFileOpt(vnodeId, fid) != NULL;
×
232
}
233

234
static const char *tsdbRepairModeName(ETsdbRepairMode mode) {
×
235
  switch (mode) {
×
236
    case TSDB_REPAIR_MODE_DROP_INVALID_ONLY:
×
237
      return "drop_invalid_only";
×
238
    case TSDB_REPAIR_MODE_HEAD_ONLY_REBUILD:
×
239
      return "head_only_rebuild";
×
240
    case TSDB_REPAIR_MODE_FULL_REBUILD:
×
241
      return "full_rebuild";
×
242
    default:
×
243
      return "unknown";
×
244
  }
245
}
246

247
static void tsdbRepairSetReason(const char **reason, const char *value) {
×
248
  if (reason == NULL || *reason != NULL || value == NULL) {
×
249
    return;
×
250
  }
251

252
  *reason = value;
×
253
}
254

255
static void tsdbRepairCoreResultInit(STsdbRepairCoreResult *result) {
×
256
  memset(result, 0, sizeof(*result));
×
257
  result->action = TSDB_REPAIR_ACTION_KEEP;
×
258
}
×
259

260
static void tsdbRepairSttResultInit(STsdbRepairSttResult *result) {
×
261
  memset(result, 0, sizeof(*result));
×
262
  result->action = TSDB_REPAIR_ACTION_KEEP;
×
263
}
×
264

265
static void tsdbRepairFinalizeCoreResult(STsdbRepairCoreResult *result) {
×
266
  if (result->action == TSDB_REPAIR_ACTION_KEEP) {
×
267
    result->action = (ETsdbRepairAction)tsdbRepairResolveCoreAction(result->keptBlocks, result->droppedBlocks);
×
268
  }
269

270
  if (result->reason == NULL && result->action != TSDB_REPAIR_ACTION_KEEP) {
×
271
    result->reason = "damaged_core_group";
×
272
  }
273
}
×
274

275
static void tsdbRepairFinalizeSttResult(STsdbRepairSttResult *result) {
×
276
  if (result->action == TSDB_REPAIR_ACTION_KEEP) {
×
277
    result->action = (ETsdbRepairAction)tsdbRepairResolveSttAction(
×
278
        result->keptDataBlocks, result->keptTombBlocks, result->droppedDataBlocks, result->droppedTombBlocks);
279
  }
280

281
  if (result->rewriteRequired && result->action == TSDB_REPAIR_ACTION_KEEP) {
×
282
    result->action = (result->keptDataBlocks > 0 || result->keptTombBlocks > 0) ? TSDB_REPAIR_ACTION_REBUILD
×
283
                                                                                : TSDB_REPAIR_ACTION_DROP;
×
284
  }
285

286
  if (result->reason == NULL && result->action != TSDB_REPAIR_ACTION_KEEP) {
×
287
    result->reason = "damaged_stt_file";
×
288
  }
289
}
×
290

291
static bool tsdbRepairFileMissing(const STFileObj *fobj) {
×
292
  if (fobj == NULL) {
×
293
    return false;
×
294
  }
295

296
  return !taosCheckExistFile(fobj->fname);
×
297
}
298

299
bool tsdbRepairFileAffected(const STFileObj *fobj) {
×
300
  // File length can diverge from current.json across crash windows; let deep scan
301
  // decide whether the payload is still readable instead of pre-dropping it here.
302
  return tsdbRepairFileMissing(fobj);
×
303
}
304

305
const char *tsdbRepairFileIssue(const STFileObj *fobj) {
×
306
  if (fobj == NULL) {
×
307
    return NULL;
×
308
  }
309

310
  if (tsdbRepairFileMissing(fobj)) {
×
311
    return "missing";
×
312
  }
313

314
  return NULL;
×
315
}
316

317
static bool tsdbRepairCoreFilesAffected(const STFileSet *pFileSet) {
×
318
  return tsdbRepairFileAffected(pFileSet->farr[TSDB_FTYPE_HEAD]) ||
×
319
         tsdbRepairFileAffected(pFileSet->farr[TSDB_FTYPE_DATA]);
×
320
}
321

322
static int32_t tsdbRepairAppendRemoveFileOp(TFileOpArray *opArr, int32_t fid, const STFile *file) {
×
323
  STFileOp op = {
×
324
      .optype = TSDB_FOP_REMOVE,
325
      .fid = fid,
326
      .of = *file,
327
  };
328

329
  return TARRAY2_APPEND(opArr, op);
×
330
}
331

332
static int32_t tsdbRepairAppendRemoveFileObjOp(TFileOpArray *opArr, int32_t fid, const STFileObj *fobj) {
×
333
  if (fobj == NULL) {
×
334
    return 0;
×
335
  }
336

337
  return tsdbRepairAppendRemoveFileOp(opArr, fid, fobj->f);
×
338
}
339

340
static int32_t tsdbRepairAppendOps(TFileOpArray *dst, const TFileOpArray *src) {
×
341
  const STFileOp *op = NULL;
×
342

343
  TARRAY2_FOREACH_PTR(src, op) {
×
344
    int32_t code = TARRAY2_APPEND(dst, *op);
×
345
    if (code != 0) {
×
346
      return code;
×
347
    }
348
  }
349

350
  return 0;
×
351
}
352

353
static int32_t tsdbRepairAppendRemoveCoreOps(const STFileSet *pFileSet, TFileOpArray *opArr) {
×
354
  int32_t code = 0;
×
355

356
  code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pFileSet->farr[TSDB_FTYPE_HEAD]);
×
357
  if (code != 0) {
×
358
    return code;
×
359
  }
360

361
  code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pFileSet->farr[TSDB_FTYPE_DATA]);
×
362
  if (code != 0) {
×
363
    return code;
×
364
  }
365

366
  return tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pFileSet->farr[TSDB_FTYPE_SMA]);
×
367
}
368

369
static int32_t tsdbRepairAppendHeadOnlyRebuildOps(const STFileSet *pFileSet, bool dropSma,
×
370
                                                  const TFileOpArray *writerOps, TFileOpArray *opArr) {
371
  int32_t code = 0;
×
372
  int32_t opMask = tsdbRepairDescribeHeadOnlyOps(pFileSet->farr[TSDB_FTYPE_HEAD] != NULL,
×
373
                                                 pFileSet->farr[TSDB_FTYPE_SMA] != NULL, dropSma);
×
374

375
  if ((opMask & TSDB_REPAIR_HEAD_OP_REMOVE_HEAD) != 0) {
×
376
    code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pFileSet->farr[TSDB_FTYPE_HEAD]);
×
377
    if (code != 0) {
×
378
      return code;
×
379
    }
380
  }
381

382
  if ((opMask & TSDB_REPAIR_HEAD_OP_REMOVE_SMA) != 0) {
×
383
    code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pFileSet->farr[TSDB_FTYPE_SMA]);
×
384
    if (code != 0) {
×
385
      return code;
×
386
    }
387
  }
388

389
  return tsdbRepairAppendOps(opArr, writerOps);
×
390
}
391

392
static int32_t tsdbForceRepairFileSetBadFiles(STFileSystem *pFS, const STFileSet *pFileSet, TFileOpArray *opArr) {
×
393
  int32_t     code = 0;
×
394
  int32_t     vgId = TD_VID(pFS->tsdb->pVnode);
×
395
  STFileObj  *pSma = pFileSet->farr[TSDB_FTYPE_SMA];
×
396
  STFileObj  *pTomb = pFileSet->farr[TSDB_FTYPE_TOMB];
×
397
  const char *headIssue = tsdbRepairFileIssue(pFileSet->farr[TSDB_FTYPE_HEAD]);
×
398
  const char *dataIssue = tsdbRepairFileIssue(pFileSet->farr[TSDB_FTYPE_DATA]);
×
399
  const char *smaIssue = tsdbRepairFileIssue(pSma);
×
400
  SSttLvl    *sttLevel = NULL;
×
401

402
  if (headIssue != NULL || dataIssue != NULL) {
×
403
    tsdbWarn("vgId:%d fid:%d drop core group before deep scan, head:%s data:%s", vgId, pFileSet->fid,
×
404
             headIssue == NULL ? "ok" : headIssue, dataIssue == NULL ? "ok" : dataIssue);
405
    if (smaIssue != NULL) {
×
406
      tsdbTrace("vgId:%d fid:%d affected sma file is covered by core-group drop, reason:%s", vgId, pFileSet->fid,
×
407
                smaIssue);
408
    }
409

410
    code = tsdbRepairAppendRemoveCoreOps(pFileSet, opArr);
×
411
    if (code != 0) {
×
412
      tsdbError("vgId:%d fid:%d failed to append remove ops for affected core group since %s, code:%d", vgId,
×
413
                pFileSet->fid, tstrerror(code), code);
414
      return code;
×
415
    }
416
  } else if (smaIssue != NULL) {
×
417
    tsdbWarn("vgId:%d fid:%d remove affected sma file before deep scan, reason:%s", vgId, pFileSet->fid, smaIssue);
×
418
    code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pSma);
×
419
    if (code != 0) {
×
420
      tsdbError("vgId:%d fid:%d failed to append remove op for affected sma file since %s, code:%d", vgId,
×
421
                pFileSet->fid, tstrerror(code), code);
422
      return code;
×
423
    }
424
  }
425

426
  if (pTomb != NULL) {
×
427
    const char *tombIssue = tsdbRepairFileIssue(pTomb);
×
428

429
    if (tombIssue != NULL) {
×
430
      tsdbWarn("vgId:%d fid:%d remove affected tomb file before deep scan, reason:%s", vgId, pFileSet->fid, tombIssue);
×
431
      code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pTomb);
×
432
      if (code != 0) {
×
433
        tsdbError("vgId:%d fid:%d failed to append remove op for affected tomb file since %s, code:%d", vgId,
×
434
                  pFileSet->fid, tstrerror(code), code);
435
        return code;
×
436
      }
437
    }
438
  }
439

440
  TARRAY2_FOREACH(pFileSet->lvlArr, sttLevel) {
×
441
    STFileObj *pStt = NULL;
×
442

443
    TARRAY2_FOREACH(sttLevel->fobjArr, pStt) {
×
444
      const char *sttIssue = tsdbRepairFileIssue(pStt);
×
445

446
      if (sttIssue == NULL) {
×
447
        continue;
×
448
      }
449

450
      tsdbWarn("vgId:%d fid:%d level:%d remove affected stt file before deep scan, reason:%s file:%s", vgId,
×
451
               pFileSet->fid, sttLevel->level, sttIssue, pStt->fname);
452
      code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pStt);
×
453
      if (code != 0) {
×
454
        tsdbError("vgId:%d fid:%d level:%d failed to append remove op for affected stt file:%s since %s, code:%d", vgId,
×
455
                  pFileSet->fid, sttLevel->level, pStt->fname, tstrerror(code), code);
456
        return code;
×
457
      }
458
    }
459
  }
460

461
  return 0;
×
462
}
463

464
static int32_t tsdbRepairAnalyzeCore(STFileSystem *pFS, const STFileSet *pFileSet, STsdbRepairCoreResult *result) {
×
465
  int32_t               code = 0;
×
466
  int32_t               vgId = TD_VID(pFS->tsdb->pVnode);
×
467
  SDataFileReader      *reader = NULL;
×
468
  SBlockData            blockData = {0};
×
469
  const TBrinBlkArray  *brinBlkArray = NULL;
×
470
  SDataFileReaderConfig readerConfig = {
×
471
      .tsdb = pFS->tsdb,
×
472
      .szPage = pFS->tsdb->pVnode->config.tsdbPageSize,
×
473
  };
474

475
  if (pFileSet->farr[TSDB_FTYPE_HEAD] == NULL || pFileSet->farr[TSDB_FTYPE_DATA] == NULL ||
×
476
      tsdbRepairCoreFilesAffected(pFileSet)) {
×
477
    tsdbTrace("vgId:%d fid:%d skip core deep scan because core files are already affected or absent", vgId,
×
478
              pFileSet->fid);
479
    return 0;
×
480
  }
481

482
  tsdbDebug("vgId:%d fid:%d start analyzing core group", vgId, pFileSet->fid);
×
483

484
  for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
485
    if (pFileSet->farr[ftype] == NULL || tsdbRepairFileAffected(pFileSet->farr[ftype])) {
×
486
      continue;
×
487
    }
488

489
    readerConfig.files[ftype].exist = true;
×
490
    readerConfig.files[ftype].file = pFileSet->farr[ftype]->f[0];
×
491
  }
492

493
  code = tsdbDataFileReaderOpen(NULL, &readerConfig, &reader);
×
494
  if (code != 0) {
×
495
    result->action = TSDB_REPAIR_ACTION_DROP;
×
496
    tsdbRepairSetReason(&result->reason, "open_core_reader_failed");
×
497
    tsdbWarn("vgId:%d fid:%d failed to open core reader, downgrade action to %s since %s", vgId, pFileSet->fid,
×
498
             tsdbRepairActionName(result->action), tstrerror(code));
499
    code = 0;
×
500
    goto _exit;
×
501
  }
502

503
  code = tsdbDataFileReadBrinBlk(reader, &brinBlkArray);
×
504
  if (code != 0) {
×
505
    result->action = TSDB_REPAIR_ACTION_DROP;
×
506
    tsdbRepairSetReason(&result->reason, "read_brin_index_failed");
×
507
    tsdbWarn("vgId:%d fid:%d failed to read brin index, downgrade action to %s since %s", vgId, pFileSet->fid,
×
508
             tsdbRepairActionName(result->action), tstrerror(code));
509
    code = 0;
×
510
    goto _exit;
×
511
  }
512

513
  for (int32_t i = 0; i < TARRAY2_SIZE(brinBlkArray); ++i) {
×
514
    const SBrinBlk *pBrinBlk = TARRAY2_GET_PTR(brinBlkArray, i);
×
515
    SBrinBlock      brinBlock = {0};
×
516

517
    code = tBrinBlockInit(&brinBlock);
×
518
    if (code != 0) {
×
519
      tsdbError("vgId:%d fid:%d failed to init brin block during core analysis since %s, code:%d", vgId, pFileSet->fid,
×
520
                tstrerror(code), code);
521
      goto _exit;
×
522
    }
523

524
    code = tsdbDataFileReadBrinBlock(reader, pBrinBlk, &brinBlock);
×
525
    if (code != 0) {
×
526
      result->droppedBlocks++;
×
527
      tsdbRepairSetReason(&result->reason, "damaged_brin_block");
×
528
      tsdbWarn("vgId:%d fid:%d skip damaged brin block #%d since %s", vgId, pFileSet->fid, i, tstrerror(code));
×
529
      code = 0;
×
530
      tBrinBlockDestroy(&brinBlock);
×
531
      continue;
×
532
    }
533

534
    for (int32_t iRecord = 0; iRecord < BRIN_BLOCK_SIZE(&brinBlock); ++iRecord) {
×
535
      SBrinRecord record = {0};
×
536

537
      code = tBrinBlockGet(&brinBlock, iRecord, &record);
×
538
      if (code != 0) {
×
539
        result->droppedBlocks++;
×
540
        tsdbRepairSetReason(&result->reason, "damaged_brin_record");
×
541
        tsdbWarn("vgId:%d fid:%d stop scanning damaged brin record in block #%d record #%d since %s", vgId,
×
542
                 pFileSet->fid, i, iRecord, tstrerror(code));
543
        code = 0;
×
544
        break;
×
545
      }
546

547
      tBlockDataClear(&blockData);
×
548
      code = tsdbDataFileReadBlockData(reader, &record, &blockData);
×
549
      if (code != 0 || !tsdbRepairDataBlockLooksValid(&blockData, &record)) {
×
550
        result->droppedBlocks++;
×
551
        tsdbRepairSetReason(&result->reason, code != 0 ? "read_data_block_failed" : "invalid_data_block");
×
552
        tsdbWarn("vgId:%d fid:%d drop damaged core block, blockOffset:%" PRId64 " blockSize:%d uid:%" PRId64
×
553
                 " suid:%" PRId64 " reason:%s",
554
                 vgId, pFileSet->fid, record.blockOffset, record.blockSize, record.uid, record.suid,
555
                 code != 0 ? tstrerror(code) : "invalid_block_payload");
556
        code = 0;
×
557
        tBlockDataClear(&blockData);
×
558
        continue;
×
559
      }
560

561
      result->keptBlocks++;
×
562
      tsdbTrace("vgId:%d fid:%d keep core block, blockOffset:%" PRId64 " blockSize:%d uid:%" PRId64 " suid:%" PRId64,
×
563
                vgId, pFileSet->fid, record.blockOffset, record.blockSize, record.uid, record.suid);
564
      tBlockDataClear(&blockData);
×
565
    }
566

567
    tBrinBlockDestroy(&brinBlock);
×
568
    if (code != 0) {
×
569
      goto _exit;
×
570
    }
571
  }
572

573
_exit:
×
574
  if (code == 0) {
×
575
    tsdbRepairFinalizeCoreResult(result);
×
576
    tsdbDebug("vgId:%d fid:%d core analysis finished, action:%s keptBlocks:%d droppedBlocks:%d reason:%s", vgId,
×
577
              pFileSet->fid, tsdbRepairActionName(result->action), result->keptBlocks, result->droppedBlocks,
578
              result->reason == NULL ? "healthy" : result->reason);
579
  }
580

581
  tBlockDataDestroy(&blockData);
×
582
  tsdbDataFileReaderClose(&reader);
×
583
  return code;
×
584
}
585

586
static int32_t tsdbRepairRebuildCore(STFileSystem *pFS, const STFileSet *pFileSet, TFileOpArray *writerOps,
×
587
                                     int32_t *rebuiltBlocks) {
588
  int32_t               code = 0;
×
589
  int32_t               vgId = TD_VID(pFS->tsdb->pVnode);
×
590
  SDataFileReader      *reader = NULL;
×
591
  SDataFileWriter      *writer = NULL;
×
592
  SBlockData            blockData = {0};
×
593
  const TBrinBlkArray  *brinBlkArray = NULL;
×
594
  SDataFileReaderConfig readerConfig = {
×
595
      .tsdb = pFS->tsdb,
×
596
      .szPage = pFS->tsdb->pVnode->config.tsdbPageSize,
×
597
  };
598
  SDataFileWriterConfig writerConfig = {
×
599
      .tsdb = pFS->tsdb,
×
600
      .cmprAlg = pFS->tsdb->pVnode->config.tsdbCfg.compression,
×
601
      .maxRow = pFS->tsdb->pVnode->config.tsdbCfg.maxRows,
×
602
      .szPage = pFS->tsdb->pVnode->config.tsdbPageSize,
×
603
      .fid = pFileSet->fid,
×
604
      .cid = tsdbFSAllocEid(pFS),
×
605
      .expLevel = tsdbFidLevel(pFileSet->fid, &pFS->tsdb->keepCfg, taosGetTimestampSec()),
×
606
      .compactVersion = INT64_MAX,
607
      .lcn = pFileSet->farr[TSDB_FTYPE_DATA] ? pFileSet->farr[TSDB_FTYPE_DATA]->f->lcn : 0,
×
608
  };
609

610
  for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
611
    if (pFileSet->farr[ftype] == NULL || tsdbRepairFileAffected(pFileSet->farr[ftype])) {
×
612
      continue;
×
613
    }
614

615
    readerConfig.files[ftype].exist = true;
×
616
    readerConfig.files[ftype].file = pFileSet->farr[ftype]->f[0];
×
617
  }
618

619
  code = tsdbDataFileReaderOpen(NULL, &readerConfig, &reader);
×
620
  if (code != 0) {
×
621
    tsdbWarn("vgId:%d fid:%d failed to reopen core reader for rebuild since %s, fallback to drop", vgId, pFileSet->fid,
×
622
             tstrerror(code));
623
    code = 0;
×
624
    goto _exit;
×
625
  }
626

627
  code = tsdbDataFileReadBrinBlk(reader, &brinBlkArray);
×
628
  if (code != 0) {
×
629
    tsdbWarn("vgId:%d fid:%d failed to reread brin index for rebuild since %s, fallback to drop", vgId, pFileSet->fid,
×
630
             tstrerror(code));
631
    code = 0;
×
632
    goto _exit;
×
633
  }
634

635
  code = tsdbDataFileWriterOpen(&writerConfig, &writer);
×
636
  if (code != 0) {
×
637
    tsdbError("vgId:%d fid:%d failed to open core writer since %s, code:%d", vgId, pFileSet->fid, tstrerror(code),
×
638
              code);
639
    goto _exit;
×
640
  }
641

642
  for (int32_t i = 0; i < TARRAY2_SIZE(brinBlkArray); ++i) {
×
643
    const SBrinBlk *pBrinBlk = TARRAY2_GET_PTR(brinBlkArray, i);
×
644
    SBrinBlock      brinBlock = {0};
×
645

646
    code = tBrinBlockInit(&brinBlock);
×
647
    if (code != 0) {
×
648
      tsdbError("vgId:%d fid:%d failed to init brin block during core rebuild since %s, code:%d", vgId, pFileSet->fid,
×
649
                tstrerror(code), code);
650
      goto _exit;
×
651
    }
652

653
    code = tsdbDataFileReadBrinBlock(reader, pBrinBlk, &brinBlock);
×
654
    if (code != 0) {
×
655
      tsdbWarn("vgId:%d fid:%d skip damaged brin block #%d during core rebuild since %s", vgId, pFileSet->fid, i,
×
656
               tstrerror(code));
657
      code = 0;
×
658
      tBrinBlockDestroy(&brinBlock);
×
659
      continue;
×
660
    }
661

662
    for (int32_t iRecord = 0; iRecord < BRIN_BLOCK_SIZE(&brinBlock); ++iRecord) {
×
663
      SBrinRecord record = {0};
×
664

665
      code = tBrinBlockGet(&brinBlock, iRecord, &record);
×
666
      if (code != 0) {
×
667
        tsdbWarn("vgId:%d fid:%d stop scanning damaged brin record in block #%d during rebuild since %s", vgId,
×
668
                 pFileSet->fid, i, tstrerror(code));
669
        code = 0;
×
670
        break;
×
671
      }
672

673
      tBlockDataClear(&blockData);
×
674
      code = tsdbDataFileReadBlockData(reader, &record, &blockData);
×
675
      if (code != 0 || !tsdbRepairDataBlockLooksValid(&blockData, &record)) {
×
676
        tsdbTrace("vgId:%d fid:%d skip invalid core block during rebuild, blockOffset:%" PRId64 " reason:%s", vgId,
×
677
                  pFileSet->fid, record.blockOffset, code != 0 ? tstrerror(code) : "invalid_block_payload");
678
        code = 0;
×
679
        tBlockDataClear(&blockData);
×
680
        continue;
×
681
      }
682

683
      code = tsdbDataFileWriteBlockData(writer, &blockData);
×
684
      tBlockDataClear(&blockData);
×
685
      if (code != 0) {
×
686
        tsdbError("vgId:%d fid:%d failed to write rebuilt core block since %s, code:%d", vgId, pFileSet->fid,
×
687
                  tstrerror(code), code);
688
        tBrinBlockDestroy(&brinBlock);
×
689
        goto _exit;
×
690
      }
691

692
      (*rebuiltBlocks)++;
×
693
      tsdbTrace("vgId:%d fid:%d rebuilt core block, blockOffset:%" PRId64 " total:%d", vgId, pFileSet->fid,
×
694
                record.blockOffset, *rebuiltBlocks);
695
    }
696

697
    tBrinBlockDestroy(&brinBlock);
×
698
    if (code != 0) {
×
699
      goto _exit;
×
700
    }
701
  }
702

703
_exit:
×
704
  tBlockDataDestroy(&blockData);
×
705
  if (writer != NULL) {
×
706
    bool    abort = tsdbRepairShouldAbortCoreWriterClose(*rebuiltBlocks);
×
707
    int32_t closeCode = tsdbDataFileWriterClose(&writer, abort, abort ? NULL : writerOps);
×
708
    if (closeCode != 0) {
×
709
      tsdbError("vgId:%d fid:%d failed to close rebuilt core writer since %s, code:%d", vgId, pFileSet->fid,
×
710
                tstrerror(closeCode), closeCode);
711
      code = code == 0 ? closeCode : code;
×
712
    }
713
  }
714
  // tsdbDataFileWriterClose(&writer, true, NULL);
715
  tsdbDataFileReaderClose(&reader);
×
716
  return code;
×
717
}
718

719
static int32_t tsdbRepairCollectHeadOnlyRecords(STFileSystem *pFS, const STFileSet *pFileSet,
×
720
                                                TBrinRecordArray *recordArr, bool *dropSma, int32_t *rebuiltBlocks) {
721
  int32_t              code = 0;
×
722
  int32_t              vgId = TD_VID(pFS->tsdb->pVnode);
×
723
  SDataFileReader     *reader = NULL;
×
724
  SBlockData           blockData = {0};
×
725
  const TBrinBlkArray *brinBlkArray = NULL;
×
726
  bool                 hasReadableSma =
×
727
      pFileSet->farr[TSDB_FTYPE_SMA] != NULL && !tsdbRepairFileAffected(pFileSet->farr[TSDB_FTYPE_SMA]);
×
728
  bool                  keepSma = hasReadableSma;
×
729
  TColumnDataAggArray   columnDataAggArray[1];
×
730
  SDataFileReaderConfig readerConfig = {
×
731
      .tsdb = pFS->tsdb,
×
732
      .szPage = pFS->tsdb->pVnode->config.tsdbPageSize,
×
733
  };
734

735
  TARRAY2_INIT(columnDataAggArray);
×
736

737
  tsdbInfo("vgId:%d fid:%d start collecting head-only rebuild records, keepSma:%s", vgId, pFileSet->fid,
×
738
           hasReadableSma ? "yes" : "no");
739

740
  for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
741
    if (pFileSet->farr[ftype] == NULL || tsdbRepairFileAffected(pFileSet->farr[ftype])) {
×
742
      continue;
×
743
    }
744

745
    readerConfig.files[ftype].exist = true;
×
746
    readerConfig.files[ftype].file = pFileSet->farr[ftype]->f[0];
×
747
  }
748

749
  code = tsdbDataFileReaderOpen(NULL, &readerConfig, &reader);
×
750
  if (code != 0) {
×
751
    tsdbWarn("vgId:%d fid:%d failed to reopen core reader for head-only rebuild since %s, fallback to drop", vgId,
×
752
             pFileSet->fid, tstrerror(code));
753
    code = 0;
×
754
    goto _exit;
×
755
  }
756

757
  code = tsdbDataFileReadBrinBlk(reader, &brinBlkArray);
×
758
  if (code != 0) {
×
759
    tsdbWarn("vgId:%d fid:%d failed to reread brin index for head-only rebuild since %s, fallback to drop", vgId,
×
760
             pFileSet->fid, tstrerror(code));
761
    code = 0;
×
762
    goto _exit;
×
763
  }
764

765
  for (int32_t i = 0; i < TARRAY2_SIZE(brinBlkArray); ++i) {
×
766
    const SBrinBlk *pBrinBlk = TARRAY2_GET_PTR(brinBlkArray, i);
×
767
    SBrinBlock      brinBlock = {0};
×
768

769
    code = tBrinBlockInit(&brinBlock);
×
770
    if (code != 0) {
×
771
      tsdbError("vgId:%d fid:%d failed to init brin block during head-only rebuild since %s, code:%d", vgId,
×
772
                pFileSet->fid, tstrerror(code), code);
773
      goto _exit;
×
774
    }
775

776
    code = tsdbDataFileReadBrinBlock(reader, pBrinBlk, &brinBlock);
×
777
    if (code != 0) {
×
778
      tsdbWarn("vgId:%d fid:%d skip damaged brin block #%d during head-only rebuild since %s", vgId, pFileSet->fid, i,
×
779
               tstrerror(code));
780
      code = 0;
×
781
      tBrinBlockDestroy(&brinBlock);
×
782
      continue;
×
783
    }
784

785
    for (int32_t iRecord = 0; iRecord < BRIN_BLOCK_SIZE(&brinBlock); ++iRecord) {
×
786
      SBrinRecord record = {0};
×
787

788
      code = tBrinBlockGet(&brinBlock, iRecord, &record);
×
789
      if (code != 0) {
×
790
        tsdbWarn("vgId:%d fid:%d stop scanning damaged brin record in block #%d during head-only rebuild since %s",
×
791
                 vgId, pFileSet->fid, i, tstrerror(code));
792
        code = 0;
×
793
        break;
×
794
      }
795

796
      tBlockDataClear(&blockData);
×
797
      code = tsdbDataFileReadBlockData(reader, &record, &blockData);
×
798
      if (code != 0 || !tsdbRepairDataBlockLooksValid(&blockData, &record)) {
×
799
        tsdbTrace("vgId:%d fid:%d skip invalid core block during head-only rebuild, blockOffset:%" PRId64 " reason:%s",
×
800
                  vgId, pFileSet->fid, record.blockOffset, code != 0 ? tstrerror(code) : "invalid_block_payload");
801
        code = 0;
×
802
        tBlockDataClear(&blockData);
×
803
        continue;
×
804
      }
805

806
      if (keepSma && record.smaSize > 0) {
×
807
        code = tsdbDataFileReadBlockSma(reader, &record, columnDataAggArray);
×
808
        if (code != 0) {
×
809
          keepSma = false;
×
810
          tsdbWarn("vgId:%d fid:%d disable sma references during head-only rebuild after blockOffset:%" PRId64
×
811
                   " since %s",
812
                   vgId, pFileSet->fid, record.blockOffset, tstrerror(code));
813
          code = 0;
×
814
        }
815
      }
816

817
      code = TARRAY2_APPEND(recordArr, record);
×
818
      tBlockDataClear(&blockData);
×
819
      if (code != 0) {
×
820
        tBrinBlockDestroy(&brinBlock);
×
821
        goto _exit;
×
822
      }
823

824
      (*rebuiltBlocks)++;
×
825
    }
826

827
    tBrinBlockDestroy(&brinBlock);
×
828
    if (code != 0) {
×
829
      goto _exit;
×
830
    }
831
  }
832

833
  if (!keepSma) {
×
834
    for (int32_t i = 0; i < TARRAY2_SIZE(recordArr); ++i) {
×
835
      SBrinRecord *record = TARRAY2_GET_PTR(recordArr, i);
×
836
      SBrinRecord  adjusted = {0};
×
837

838
      tsdbRepairBuildHeadOnlyBrinRecord(record, false, &adjusted);
×
839
      *record = adjusted;
×
840
    }
841
  }
842

843
  *dropSma = hasReadableSma && !keepSma;
×
844

845
_exit:
×
846
  if (code == 0) {
×
847
    tsdbInfo("vgId:%d fid:%d collected %d head-only rebuild records, dropSma:%s", vgId, pFileSet->fid,
×
848
             *rebuiltBlocks, *dropSma ? "yes" : "no");
849
  } else {
850
    tsdbError("vgId:%d fid:%d failed to collect head-only rebuild records since %s, code:%d", vgId, pFileSet->fid,
×
851
              tstrerror(code), code);
852
  }
853
  TARRAY2_DESTROY(columnDataAggArray, NULL);
×
854
  tBlockDataDestroy(&blockData);
×
855
  tsdbDataFileReaderClose(&reader);
×
856
  return code;
×
857
}
858

859
static int32_t tsdbRepairWriteHeadOnlyCore(STFileSystem *pFS, const STFileSet *pFileSet,
×
860
                                           const TBrinRecordArray *recordArr, TFileOpArray *writerOps) {
861
  int32_t                  code = 0;
×
862
  int32_t                  vgId = TD_VID(pFS->tsdb->pVnode);
×
863
  int64_t                  headFileSize = 0;
×
864
  SDiskID                  diskId = {0};
×
865
  SDataFileRAWWriter      *writer = NULL;
×
866
  SBrinBlock               brinBlock = {0};
×
867
  TBrinBlkArray            brinBlkArray[1];
×
868
  SBuffer                  buffers[3];
×
869
  SHeadFooter              headFooter = {0};
×
870
  SVersionRange            range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
×
871
  SEncryptData            *pEncryptData = &(pFS->tsdb->pVnode->config.tsdbCfg.encryptData);
×
872
  SDataFileRAWWriterConfig writerConfig = {
×
873
      .tsdb = pFS->tsdb,
×
874
      .szPage = pFS->tsdb->pVnode->config.tsdbPageSize,
×
875
      .fid = pFileSet->fid,
×
876
      .cid = tsdbFSAllocEid(pFS),
×
877
  };
878

879
  tsdbInfo("vgId:%d fid:%d start writing head-only core, records:%d", vgId, pFileSet->fid, TARRAY2_SIZE(recordArr));
×
880

881
  code = tsdbAllocateDisk(pFS->tsdb, tsdbFTypeLabel(TSDB_FTYPE_HEAD),
×
882
                          tsdbFidLevel(pFileSet->fid, &pFS->tsdb->keepCfg, taosGetTimestampSec()), &diskId);
×
883
  if (code != 0) {
×
884
    tsdbError("vgId:%d fid:%d failed to allocate disk for head-only rebuild since %s, code:%d", vgId, pFileSet->fid,
×
885
              tstrerror(code), code);
886
    return code;
×
887
  }
888

889
  writerConfig.file = (STFile){
×
890
      .type = TSDB_FTYPE_HEAD,
891
      .did = diskId,
892
      .fid = pFileSet->fid,
×
893
      .cid = writerConfig.cid,
×
894
      .size = 0,
895
      .minVer = VERSION_MAX,
896
      .maxVer = VERSION_MIN,
897
  };
898

899
  TARRAY2_INIT(brinBlkArray);
×
900
  for (int32_t i = 0; i < 3; ++i) {
×
901
    tBufferInit(&buffers[i]);
×
902
  }
903

904
  code = tBrinBlockInit(&brinBlock);
×
905
  if (code != 0) {
×
906
    goto _exit;
×
907
  }
908

909
  code = tsdbDataFileRAWWriterOpen(&writerConfig, &writer);
×
910
  if (code != 0) {
×
911
    tsdbError("vgId:%d fid:%d failed to open head-only writer since %s, code:%d", vgId, pFileSet->fid, tstrerror(code),
×
912
              code);
913
    goto _exit;
×
914
  }
915

916
  int32_t flushThreshold = tsdbRepairResolveHeadOnlyBrinFlushThreshold(pFS->tsdb->pVnode->config.tsdbCfg.maxRows);
×
917
  for (int32_t i = 0; i < TARRAY2_SIZE(recordArr); ++i) {
×
918
    const SBrinRecord *record = TARRAY2_GET_PTR(recordArr, i);
×
919

920
    if (tsdbRepairShouldFlushHeadOnlyBrinBlock(brinBlock.numOfRecords, flushThreshold)) {
×
921
      code = tsdbFileWriteBrinBlock(writer->fd, &brinBlock, pFS->tsdb->pVnode->config.tsdbCfg.compression,
×
922
                                    &writer->file.size, brinBlkArray, buffers, &range, pEncryptData);
×
923
      if (code != 0) {
×
924
        goto _exit;
×
925
      }
926
    }
927

928
    code = tBrinBlockPut(&brinBlock, record);
×
929
    if (tsdbRepairShouldRetryHeadOnlyBrinPut(code, brinBlock.numOfRecords, flushThreshold)) {
×
930
      code = tsdbFileWriteBrinBlock(writer->fd, &brinBlock, pFS->tsdb->pVnode->config.tsdbCfg.compression,
×
931
                                    &writer->file.size, brinBlkArray, buffers, &range, pEncryptData);
×
932
      if (code != 0) {
×
933
        goto _exit;
×
934
      }
935

936
      code = tBrinBlockPut(&brinBlock, record);
×
937
    }
938

939
    if (code != 0) {
×
940
      goto _exit;
×
941
    }
942
  }
943

944
  code = tsdbFileWriteBrinBlock(writer->fd, &brinBlock, pFS->tsdb->pVnode->config.tsdbCfg.compression,
×
945
                                &writer->file.size, brinBlkArray, buffers, &range, pEncryptData);
×
946
  if (code != 0) {
×
947
    goto _exit;
×
948
  }
949

950
  code = tsdbFileWriteBrinBlk(writer->fd, brinBlkArray, headFooter.brinBlkPtr, &writer->file.size, pEncryptData);
×
951
  if (code != 0) {
×
952
    goto _exit;
×
953
  }
954

955
  code = tsdbFileWriteHeadFooter(writer->fd, &writer->file.size, &headFooter, pEncryptData);
×
956
  if (code != 0) {
×
957
    goto _exit;
×
958
  }
959

960
  if (range.minVer <= range.maxVer) {
×
961
    writer->file.minVer = range.minVer;
×
962
    writer->file.maxVer = range.maxVer;
×
963
  }
964

965
_exit:
×
966
  if (writer != NULL) {
×
967
    headFileSize = writer->file.size;
×
968
  }
969
  if (writer != NULL) {
×
970
    int32_t closeCode = tsdbDataFileRAWWriterClose(&writer, code != 0, code == 0 ? writerOps : NULL);
×
971
    if (closeCode != 0 && code == 0) {
×
972
      code = closeCode;
×
973
    }
974
  }
975
  if (code == 0) {
×
976
    tsdbInfo("vgId:%d fid:%d finished writing head-only core, brinBlocks:%d fileSize:%" PRId64 " writerOps:%d", vgId,
×
977
             pFileSet->fid, TARRAY2_SIZE(brinBlkArray), headFileSize, TARRAY2_SIZE(writerOps));
978
  } else {
979
    tsdbError("vgId:%d fid:%d failed to write head-only core since %s, code:%d", vgId, pFileSet->fid, tstrerror(code),
×
980
              code);
981
  }
982
  tBrinBlockDestroy(&brinBlock);
×
983
  TARRAY2_DESTROY(brinBlkArray, NULL);
×
984
  for (int32_t i = 0; i < 3; ++i) {
×
985
    tBufferDestroy(&buffers[i]);
×
986
  }
987
  return code;
×
988
}
989

990
static int32_t tsdbRepairRebuildCoreHeadOnly(STFileSystem *pFS, const STFileSet *pFileSet, TFileOpArray *writerOps,
×
991
                                             bool *dropSma, int32_t *rebuiltBlocks) {
992
  int32_t          code = 0;
×
993
  int32_t          vgId = TD_VID(pFS->tsdb->pVnode);
×
994
  TBrinRecordArray recordArr[1];
×
995

996
  TARRAY2_INIT(recordArr);
×
997

998
  code = tsdbRepairCollectHeadOnlyRecords(pFS, pFileSet, recordArr, dropSma, rebuiltBlocks);
×
999
  if (code != 0 || *rebuiltBlocks == 0) {
×
1000
    if (code == 0) {
×
1001
      tsdbWarn("vgId:%d fid:%d head-only rebuild found no valid records to preserve", vgId, pFileSet->fid);
×
1002
    }
1003
    goto _exit;
×
1004
  }
1005

1006
  code = tsdbRepairWriteHeadOnlyCore(pFS, pFileSet, recordArr, writerOps);
×
1007

1008
_exit:
×
1009
  if (code == 0 && *rebuiltBlocks > 0) {
×
1010
    tsdbInfo("vgId:%d fid:%d head-only rebuild prepared successfully, rebuiltBlocks:%d dropSma:%s writerOps:%d", vgId,
×
1011
             pFileSet->fid, *rebuiltBlocks, *dropSma ? "yes" : "no", TARRAY2_SIZE(writerOps));
1012
  }
1013
  TARRAY2_DESTROY(recordArr, NULL);
×
1014
  return code;
×
1015
}
1016

1017
static int32_t tsdbRepairApplyCoreResult(STFileSystem *pFS, const STFileSet *pFileSet, ETsdbRepairMode mode,
×
1018
                                         const STsdbRepairCoreResult *result, TFileOpArray *opArr) {
1019
  int32_t      code = 0;
×
1020
  int32_t      vgId = TD_VID(pFS->tsdb->pVnode);
×
1021
  int32_t      rebuiltBlocks = 0;
×
1022
  bool         dropSma = false;
×
1023
  TFileOpArray writerOps[1];
×
1024

1025
  TARRAY2_INIT(writerOps);
×
1026

1027
  if (result->action == TSDB_REPAIR_ACTION_KEEP) {
×
1028
    tsdbDebug("vgId:%d fid:%d keep core group unchanged", vgId, pFileSet->fid);
×
1029
    goto _exit;
×
1030
  }
1031

1032
  if (result->action == TSDB_REPAIR_ACTION_DROP) {
×
1033
    tsdbInfo("vgId:%d fid:%d drop core group, reason:%s", vgId, pFileSet->fid,
×
1034
             result->reason == NULL ? "damaged_core_group" : result->reason);
1035
    code = tsdbRepairAppendRemoveCoreOps(pFileSet, opArr);
×
1036
    if (code != 0) {
×
1037
      tsdbError("vgId:%d fid:%d failed to append remove ops for core group since %s, code:%d", vgId, pFileSet->fid,
×
1038
                tstrerror(code), code);
1039
    }
1040
    goto _exit;
×
1041
  }
1042

1043
  tsdbInfo("vgId:%d fid:%d rebuild core group, mode:%s keptBlocks:%d droppedBlocks:%d reason:%s", vgId, pFileSet->fid,
×
1044
           tsdbRepairModeName(mode), result->keptBlocks, result->droppedBlocks,
1045
           result->reason == NULL ? "damaged_core_group" : result->reason);
1046

1047
  if (mode == TSDB_REPAIR_MODE_HEAD_ONLY_REBUILD) {
×
1048
    code = tsdbRepairRebuildCoreHeadOnly(pFS, pFileSet, writerOps, &dropSma, &rebuiltBlocks);
×
1049
  } else {
1050
    code = tsdbRepairRebuildCore(pFS, pFileSet, writerOps, &rebuiltBlocks);
×
1051
  }
1052
  if (code != 0) {
×
1053
    goto _exit;
×
1054
  }
1055

1056
  if (rebuiltBlocks == 0) {
×
1057
    tsdbWarn("vgId:%d fid:%d rebuilt core group has no valid blocks left, fallback to drop", vgId, pFileSet->fid);
×
1058
    code = tsdbRepairAppendRemoveCoreOps(pFileSet, opArr);
×
1059
    if (code != 0) {
×
1060
      tsdbError("vgId:%d fid:%d failed to append fallback remove ops for core group since %s, code:%d", vgId,
×
1061
                pFileSet->fid, tstrerror(code), code);
1062
    }
1063
    goto _exit;
×
1064
  }
1065

1066
  if (mode == TSDB_REPAIR_MODE_HEAD_ONLY_REBUILD) {
×
1067
    code = tsdbRepairAppendHeadOnlyRebuildOps(pFileSet, dropSma, writerOps, opArr);
×
1068
    if (code == 0) {
×
1069
      tsdbInfo("vgId:%d fid:%d prepared head-only rebuild ops, rebuiltBlocks:%d dropSma:%s appendedWriterOps:%d", vgId,
×
1070
               pFileSet->fid, rebuiltBlocks, dropSma ? "yes" : "no", TARRAY2_SIZE(writerOps));
1071
    }
1072
  } else {
1073
    code = tsdbRepairAppendRemoveCoreOps(pFileSet, opArr);
×
1074
    if (code != 0) {
×
1075
      tsdbError("vgId:%d fid:%d failed to append remove ops before core rebuild commit since %s, code:%d", vgId,
×
1076
                pFileSet->fid, tstrerror(code), code);
1077
      goto _exit;
×
1078
    }
1079

1080
    code = tsdbRepairAppendOps(opArr, writerOps);
×
1081
  }
1082
  if (code != 0) {
×
1083
    tsdbError("vgId:%d fid:%d failed to append rebuilt core ops since %s, code:%d", vgId, pFileSet->fid,
×
1084
              tstrerror(code), code);
1085
    goto _exit;
×
1086
  }
1087

1088
  tsdbDebug("vgId:%d fid:%d appended %d rebuilt core ops", vgId, pFileSet->fid, TARRAY2_SIZE(writerOps));
×
1089

1090
_exit:
×
1091
  TARRAY2_DESTROY(writerOps, NULL);
×
1092
  return code;
×
1093
}
1094

1095
static int32_t tsdbDeepScanAndFixDataPart(STFileSystem *pFS, const STFileSet *pFileSet, ETsdbRepairMode mode,
×
1096
                                          TFileOpArray *opArr) {
1097
  int32_t               code = 0;
×
1098
  STsdbRepairCoreResult result;
×
1099

1100
  tsdbRepairCoreResultInit(&result);
×
1101

1102
  code = tsdbRepairAnalyzeCore(pFS, pFileSet, &result);
×
1103
  if (code != 0) {
×
1104
    tsdbError("vgId:%d fid:%d failed to analyze core group since %s, code:%d", TD_VID(pFS->tsdb->pVnode), pFileSet->fid,
×
1105
              tstrerror(code), code);
1106
    return code;
×
1107
  }
1108

1109
  return tsdbRepairApplyCoreResult(pFS, pFileSet, mode, &result, opArr);
×
1110
}
1111

1112
static int32_t tsdbRepairAnalyzeSttFile(STFileSystem *pFS, const STFileSet *pFileSet, const STFileObj *pStt,
×
1113
                                        STsdbRepairSttResult *result) {
1114
  int32_t                code = 0;
×
1115
  int32_t                vgId = TD_VID(pFS->tsdb->pVnode);
×
1116
  SSttFileReader        *reader = NULL;
×
1117
  SBlockData             blockData = {0};
×
1118
  STombBlock             tombBlock = {0};
×
1119
  bool                   tombBlockInit = false;
×
1120
  const TSttBlkArray    *sttBlkArray = NULL;
×
1121
  const TStatisBlkArray *statisBlkArray = NULL;
×
1122
  const TTombBlkArray   *tombBlkArray = NULL;
×
1123
  SSttFileReaderConfig   readerConfig = {
×
1124
        .tsdb = pFS->tsdb,
×
1125
        .szPage = pFS->tsdb->pVnode->config.tsdbPageSize,
×
1126
        .file[0] = pStt->f[0],
1127
  };
1128

1129
  if (pStt == NULL || tsdbRepairFileAffected(pStt)) {
×
1130
    tsdbTrace("vgId:%d fid:%d skip stt deep scan because file is already affected or absent", vgId, pFileSet->fid);
×
1131
    return 0;
×
1132
  }
1133

1134
  tsdbDebug("vgId:%d fid:%d level:%d start analyzing stt file:%s", vgId, pFileSet->fid, pStt->f->stt->level,
×
1135
            pStt->fname);
1136

1137
  code = tsdbSttFileReaderOpen(pStt->fname, &readerConfig, &reader);
×
1138
  if (code != 0) {
×
1139
    result->action = TSDB_REPAIR_ACTION_DROP;
×
1140
    tsdbRepairSetReason(&result->reason, "open_stt_reader_failed");
×
1141
    tsdbWarn("vgId:%d fid:%d level:%d failed to open stt file:%s, downgrade action to %s since %s", vgId, pFileSet->fid,
×
1142
             pStt->f->stt->level, pStt->fname, tsdbRepairActionName(result->action), tstrerror(code));
1143
    code = 0;
×
1144
    goto _exit;
×
1145
  }
1146

1147
  code = tsdbSttFileReadSttBlk(reader, &sttBlkArray);
×
1148
  if (code != 0) {
×
1149
    result->action = TSDB_REPAIR_ACTION_DROP;
×
1150
    tsdbRepairSetReason(&result->reason, "read_stt_index_failed");
×
1151
    tsdbWarn("vgId:%d fid:%d level:%d failed to read stt index from file:%s, downgrade action to %s since %s", vgId,
×
1152
             pFileSet->fid, pStt->f->stt->level, pStt->fname, tsdbRepairActionName(result->action), tstrerror(code));
1153
    code = 0;
×
1154
    goto _exit;
×
1155
  }
1156

1157
  code = tsdbSttFileReadStatisBlk(reader, &statisBlkArray);
×
1158
  if (code != 0) {
×
1159
    result->rewriteRequired = true;
×
1160
    tsdbRepairSetReason(&result->reason, "read_statis_index_failed");
×
1161
    tsdbWarn("vgId:%d fid:%d level:%d failed to read statis index from file:%s since %s, mark file for rebuild", vgId,
×
1162
             pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code));
1163
    code = 0;
×
1164
  }
1165
  TAOS_UNUSED(statisBlkArray);
1166

1167
  for (int32_t i = 0; i < TARRAY2_SIZE(sttBlkArray); ++i) {
×
1168
    const SSttBlk *pSttBlk = TARRAY2_GET_PTR(sttBlkArray, i);
×
1169

1170
    tBlockDataClear(&blockData);
×
1171
    code = tsdbSttFileReadBlockData(reader, pSttBlk, &blockData);
×
1172
    if (code != 0 || !tsdbRepairSttBlockLooksValid(&blockData)) {
×
1173
      result->rewriteRequired = true;
×
1174
      result->droppedDataBlocks++;
×
1175
      tsdbRepairSetReason(&result->reason, code != 0 ? "read_stt_block_failed" : "invalid_stt_block");
×
1176
      tsdbWarn("vgId:%d fid:%d level:%d drop damaged stt block from file:%s blockOffset:%" PRId64 " reason:%s", vgId,
×
1177
               pFileSet->fid, pStt->f->stt->level, pStt->fname, pSttBlk->bInfo.offset,
1178
               code != 0 ? tstrerror(code) : "invalid_block_payload");
1179
      code = 0;
×
1180
      tBlockDataClear(&blockData);
×
1181
      continue;
×
1182
    }
1183

1184
    result->keptDataBlocks++;
×
1185
    tsdbTrace("vgId:%d fid:%d level:%d keep stt data block from file:%s blockOffset:%" PRId64, vgId, pFileSet->fid,
×
1186
              pStt->f->stt->level, pStt->fname, pSttBlk->bInfo.offset);
1187
    tBlockDataClear(&blockData);
×
1188
  }
1189

1190
  code = tsdbSttFileReadTombBlk(reader, &tombBlkArray);
×
1191
  if (code != 0) {
×
1192
    result->rewriteRequired = true;
×
1193
    result->droppedTombBlocks++;
×
1194
    tsdbRepairSetReason(&result->reason, "read_tomb_index_failed");
×
1195
    tsdbWarn("vgId:%d fid:%d level:%d failed to read tomb index from stt file:%s since %s, continue without tomb data",
×
1196
             vgId, pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code));
1197
    code = 0;
×
1198
    goto _exit;
×
1199
  }
1200

1201
  tTombBlockInit(&tombBlock);
×
1202
  tombBlockInit = true;
×
1203

1204
  for (int32_t i = 0; i < TARRAY2_SIZE(tombBlkArray); ++i) {
×
1205
    const STombBlk *pTombBlk = TARRAY2_GET_PTR(tombBlkArray, i);
×
1206

1207
    code = tsdbSttFileReadTombBlock(reader, pTombBlk, &tombBlock);
×
1208
    if (code != 0) {
×
1209
      result->rewriteRequired = true;
×
1210
      result->droppedTombBlocks++;
×
1211
      tsdbRepairSetReason(&result->reason, "read_tomb_block_failed");
×
1212
      tsdbWarn("vgId:%d fid:%d level:%d drop damaged tomb block from stt file:%s since %s", vgId, pFileSet->fid,
×
1213
               pStt->f->stt->level, pStt->fname, tstrerror(code));
1214
      code = 0;
×
1215
      continue;
×
1216
    }
1217

1218
    if (TOMB_BLOCK_SIZE(&tombBlock) > 0) {
×
1219
      result->keptTombBlocks++;
×
1220
      tsdbTrace("vgId:%d fid:%d level:%d keep tomb block from stt file:%s numRecords:%d", vgId, pFileSet->fid,
×
1221
                pStt->f->stt->level, pStt->fname, TOMB_BLOCK_SIZE(&tombBlock));
1222
    }
1223
  }
1224

1225
_exit:
×
1226
  if (code == 0) {
×
1227
    tsdbRepairFinalizeSttResult(result);
×
1228
    tsdbDebug(
×
1229
        "vgId:%d fid:%d level:%d stt analysis finished, file:%s action:%s keptData:%d droppedData:%d "
1230
        "keptTomb:%d droppedTomb:%d reason:%s",
1231
        vgId, pFileSet->fid, pStt->f->stt->level, pStt->fname, tsdbRepairActionName(result->action),
1232
        result->keptDataBlocks, result->droppedDataBlocks, result->keptTombBlocks, result->droppedTombBlocks,
1233
        result->reason == NULL ? "healthy" : result->reason);
1234
  }
1235

1236
  tBlockDataDestroy(&blockData);
×
1237
  if (tombBlockInit) {
×
1238
    tTombBlockDestroy(&tombBlock);
×
1239
  }
1240
  tsdbSttFileReaderClose(&reader);
×
1241
  return code;
×
1242
}
1243

1244
static int32_t tsdbRepairRebuildSttFile(STFileSystem *pFS, const STFileSet *pFileSet, const STFileObj *pStt,
×
1245
                                        TFileOpArray *writerOps, int32_t *rebuiltDataBlocks,
1246
                                        int32_t *rebuiltTombBlocks) {
1247
  int32_t              code = 0;
×
1248
  int32_t              vgId = TD_VID(pFS->tsdb->pVnode);
×
1249
  SSttFileReader      *reader = NULL;
×
1250
  SSttFileWriter      *writer = NULL;
×
1251
  SBlockData           blockData = {0};
×
1252
  STombBlock           tombBlock = {0};
×
1253
  bool                 tombBlockInit = false;
×
1254
  const TSttBlkArray  *sttBlkArray = NULL;
×
1255
  const TTombBlkArray *tombBlkArray = NULL;
×
1256
  SSttFileReaderConfig readerConfig = {
×
1257
      .tsdb = pFS->tsdb,
×
1258
      .szPage = pFS->tsdb->pVnode->config.tsdbPageSize,
×
1259
      .file[0] = pStt->f[0],
1260
  };
1261
  SSttFileWriterConfig writerConfig = {
×
1262
      .tsdb = pFS->tsdb,
×
1263
      .maxRow = pFS->tsdb->pVnode->config.tsdbCfg.maxRows,
×
1264
      .szPage = pFS->tsdb->pVnode->config.tsdbPageSize,
×
1265
      .cmprAlg = pFS->tsdb->pVnode->config.tsdbCfg.compression,
×
1266
      .compactVersion = INT64_MAX,
1267
      .expLevel = tsdbFidLevel(pFileSet->fid, &pFS->tsdb->keepCfg, taosGetTimestampSec()),
×
1268
      .fid = pFileSet->fid,
×
1269
      .cid = tsdbFSAllocEid(pFS),
×
1270
      .level = pStt->f->stt->level,
×
1271
  };
1272

1273
  code = tsdbSttFileReaderOpen(pStt->fname, &readerConfig, &reader);
×
1274
  if (code != 0) {
×
1275
    tsdbWarn("vgId:%d fid:%d level:%d failed to reopen stt file:%s for rebuild since %s, fallback to drop", vgId,
×
1276
             pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code));
1277
    code = 0;
×
1278
    goto _exit;
×
1279
  }
1280

1281
  code = tsdbSttFileReadSttBlk(reader, &sttBlkArray);
×
1282
  if (code != 0) {
×
1283
    tsdbWarn("vgId:%d fid:%d level:%d failed to reread stt index from file:%s for rebuild since %s, fallback to drop",
×
1284
             vgId, pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code));
1285
    code = 0;
×
1286
    goto _exit;
×
1287
  }
1288

1289
  code = tsdbSttFileWriterOpen(&writerConfig, &writer);
×
1290
  if (code != 0) {
×
1291
    tsdbError("vgId:%d fid:%d level:%d failed to open stt writer for file:%s since %s, code:%d", vgId, pFileSet->fid,
×
1292
              pStt->f->stt->level, pStt->fname, tstrerror(code), code);
1293
    goto _exit;
×
1294
  }
1295

1296
  for (int32_t i = 0; i < TARRAY2_SIZE(sttBlkArray); ++i) {
×
1297
    const SSttBlk *pSttBlk = TARRAY2_GET_PTR(sttBlkArray, i);
×
1298

1299
    tBlockDataClear(&blockData);
×
1300
    code = tsdbSttFileReadBlockData(reader, pSttBlk, &blockData);
×
1301
    if (code != 0 || !tsdbRepairSttBlockLooksValid(&blockData)) {
×
1302
      tsdbTrace("vgId:%d fid:%d level:%d skip invalid stt data block during rebuild, file:%s blockOffset:%" PRId64
×
1303
                " reason:%s",
1304
                vgId, pFileSet->fid, pStt->f->stt->level, pStt->fname, pSttBlk->bInfo.offset,
1305
                code != 0 ? tstrerror(code) : "invalid_block_payload");
1306
      code = 0;
×
1307
      tBlockDataClear(&blockData);
×
1308
      continue;
×
1309
    }
1310

1311
    code = tsdbSttFileWriteBlockData(writer, &blockData);
×
1312
    tBlockDataClear(&blockData);
×
1313
    if (code != 0) {
×
1314
      tsdbError("vgId:%d fid:%d level:%d failed to write rebuilt stt data block for file:%s since %s, code:%d", vgId,
×
1315
                pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code), code);
1316
      goto _exit;
×
1317
    }
1318

1319
    (*rebuiltDataBlocks)++;
×
1320
    tsdbTrace("vgId:%d fid:%d level:%d rebuilt stt data block for file:%s total:%d", vgId, pFileSet->fid,
×
1321
              pStt->f->stt->level, pStt->fname, *rebuiltDataBlocks);
1322
  }
1323

1324
  code = tsdbSttFileReadTombBlk(reader, &tombBlkArray);
×
1325
  if (code != 0) {
×
1326
    tsdbWarn(
×
1327
        "vgId:%d fid:%d level:%d failed to reread tomb index from file:%s during rebuild since %s, continue "
1328
        "without tomb data",
1329
        vgId, pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code));
1330
    code = 0;
×
1331
    goto _exit;
×
1332
  }
1333

1334
  tTombBlockInit(&tombBlock);
×
1335
  tombBlockInit = true;
×
1336

1337
  for (int32_t i = 0; i < TARRAY2_SIZE(tombBlkArray); ++i) {
×
1338
    const STombBlk *pTombBlk = TARRAY2_GET_PTR(tombBlkArray, i);
×
1339

1340
    code = tsdbSttFileReadTombBlock(reader, pTombBlk, &tombBlock);
×
1341
    if (code != 0) {
×
1342
      tsdbTrace("vgId:%d fid:%d level:%d skip invalid tomb block during rebuild, file:%s reason:%s", vgId,
×
1343
                pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code));
1344
      code = 0;
×
1345
      continue;
×
1346
    }
1347

1348
    for (int32_t j = 0; j < TOMB_BLOCK_SIZE(&tombBlock); ++j) {
×
1349
      STombRecord record = {0};
×
1350

1351
      code = tTombBlockGet(&tombBlock, j, &record);
×
1352
      if (code != 0) {
×
1353
        tsdbTrace("vgId:%d fid:%d level:%d stop scanning invalid tomb record during rebuild for file:%s since %s", vgId,
×
1354
                  pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code));
1355
        code = 0;
×
1356
        break;
×
1357
      }
1358

1359
      code = tsdbSttFileWriteTombRecord(writer, &record);
×
1360
      if (code != 0) {
×
1361
        tsdbError("vgId:%d fid:%d level:%d failed to write rebuilt tomb record for file:%s since %s, code:%d", vgId,
×
1362
                  pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code), code);
1363
        goto _exit;
×
1364
      }
1365

1366
      if (j == 0) {
×
1367
        (*rebuiltTombBlocks)++;
×
1368
      }
1369
    }
1370
  }
1371

1372
_exit:
×
1373
  tBlockDataDestroy(&blockData);
×
1374
  if (writer != NULL) {
×
1375
    int8_t  abort = tsdbRepairShouldAbortSttWriterClose(*rebuiltDataBlocks, *rebuiltTombBlocks);
×
1376
    int32_t closeCode = tsdbSttFileWriterClose(&writer, abort, abort ? NULL : writerOps);
×
1377
    if (closeCode != 0) {
×
1378
      tsdbError("vgId:%d fid:%d level:%d failed to close rebuilt stt writer for file:%s since %s, code:%d", vgId,
×
1379
                pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(closeCode), closeCode);
1380
      code = code == 0 ? closeCode : code;
×
1381
    }
1382
  }
1383
  // tsdbSttFileWriterClose(&writer, true, NULL);
1384
  if (tombBlockInit) {
×
1385
    tTombBlockDestroy(&tombBlock);
×
1386
  }
1387
  tsdbSttFileReaderClose(&reader);
×
1388
  return code;
×
1389
}
1390

1391
static int32_t tsdbRepairApplySttResult(STFileSystem *pFS, const STFileSet *pFileSet, const STFileObj *pStt,
×
1392
                                        const STsdbRepairSttResult *result, TFileOpArray *opArr) {
1393
  int32_t      code = 0;
×
1394
  int32_t      vgId = TD_VID(pFS->tsdb->pVnode);
×
1395
  int32_t      rebuiltDataBlocks = 0;
×
1396
  int32_t      rebuiltTombBlocks = 0;
×
1397
  TFileOpArray writerOps[1];
×
1398

1399
  TARRAY2_INIT(writerOps);
×
1400

1401
  if (result->action == TSDB_REPAIR_ACTION_KEEP) {
×
1402
    tsdbDebug("vgId:%d fid:%d level:%d keep stt file unchanged:%s", vgId, pFileSet->fid, pStt->f->stt->level,
×
1403
              pStt->fname);
1404
    goto _exit;
×
1405
  }
1406

1407
  if (result->action == TSDB_REPAIR_ACTION_DROP) {
×
1408
    tsdbInfo("vgId:%d fid:%d level:%d drop stt file:%s reason:%s", vgId, pFileSet->fid, pStt->f->stt->level,
×
1409
             pStt->fname, result->reason == NULL ? "damaged_stt_file" : result->reason);
1410
    code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pStt);
×
1411
    if (code != 0) {
×
1412
      tsdbError("vgId:%d fid:%d level:%d failed to append remove op for stt file:%s since %s, code:%d", vgId,
×
1413
                pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code), code);
1414
    }
1415
    goto _exit;
×
1416
  }
1417

1418
  tsdbInfo(
×
1419
      "vgId:%d fid:%d level:%d rebuild stt file:%s keptData:%d droppedData:%d keptTomb:%d droppedTomb:%d "
1420
      "reason:%s",
1421
      vgId, pFileSet->fid, pStt->f->stt->level, pStt->fname, result->keptDataBlocks, result->droppedDataBlocks,
1422
      result->keptTombBlocks, result->droppedTombBlocks, result->reason == NULL ? "damaged_stt_file" : result->reason);
1423

1424
  code = tsdbRepairRebuildSttFile(pFS, pFileSet, pStt, writerOps, &rebuiltDataBlocks, &rebuiltTombBlocks);
×
1425
  if (code != 0) {
×
1426
    goto _exit;
×
1427
  }
1428

1429
  if (rebuiltDataBlocks == 0 && rebuiltTombBlocks == 0) {
×
1430
    tsdbWarn("vgId:%d fid:%d level:%d rebuilt stt file:%s has no valid content left, fallback to drop", vgId,
×
1431
             pFileSet->fid, pStt->f->stt->level, pStt->fname);
1432
    code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pStt);
×
1433
    if (code != 0) {
×
1434
      tsdbError("vgId:%d fid:%d level:%d failed to append fallback remove op for stt file:%s since %s, code:%d", vgId,
×
1435
                pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code), code);
1436
    }
1437
    goto _exit;
×
1438
  }
1439

1440
  code = tsdbRepairAppendRemoveFileObjOp(opArr, pFileSet->fid, pStt);
×
1441
  if (code != 0) {
×
1442
    tsdbError(
×
1443
        "vgId:%d fid:%d level:%d failed to append remove op before stt rebuild commit for file:%s since %s, "
1444
        "code:%d",
1445
        vgId, pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code), code);
1446
    goto _exit;
×
1447
  }
1448

1449
  code = tsdbRepairAppendOps(opArr, writerOps);
×
1450
  if (code != 0) {
×
1451
    tsdbError("vgId:%d fid:%d level:%d failed to append rebuilt stt ops for file:%s since %s, code:%d", vgId,
×
1452
              pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code), code);
1453
    goto _exit;
×
1454
  }
1455

1456
  tsdbDebug("vgId:%d fid:%d level:%d appended %d rebuilt stt ops for file:%s", vgId, pFileSet->fid, pStt->f->stt->level,
×
1457
            TARRAY2_SIZE(writerOps), pStt->fname);
1458

1459
_exit:
×
1460
  TARRAY2_DESTROY(writerOps, NULL);
×
1461
  return code;
×
1462
}
1463

1464
static int32_t tsdbDeepScanAndFixSttPart(STFileSystem *pFS, const STFileSet *pFileSet, TFileOpArray *opArr) {
×
1465
  int32_t  code = 0;
×
1466
  SSttLvl *sttLevel = NULL;
×
1467

1468
  TARRAY2_FOREACH(pFileSet->lvlArr, sttLevel) {
×
1469
    STFileObj *pStt = NULL;
×
1470

1471
    TARRAY2_FOREACH(sttLevel->fobjArr, pStt) {
×
1472
      STsdbRepairSttResult result;
×
1473

1474
      tsdbRepairSttResultInit(&result);
×
1475

1476
      code = tsdbRepairAnalyzeSttFile(pFS, pFileSet, pStt, &result);
×
1477
      if (code != 0) {
×
1478
        tsdbError("vgId:%d fid:%d level:%d failed to analyze stt file:%s since %s, code:%d", TD_VID(pFS->tsdb->pVnode),
×
1479
                  pFileSet->fid, pStt->f->stt->level, pStt->fname, tstrerror(code), code);
1480
        return code;
×
1481
      }
1482

1483
      code = tsdbRepairApplySttResult(pFS, pFileSet, pStt, &result, opArr);
×
1484
      if (code != 0) {
×
1485
        return code;
×
1486
      }
1487
    }
1488
  }
1489

1490
  return 0;
×
1491
}
1492

1493
static int32_t tsdbForceRepairFileSetDeepScanAndFix(STFileSystem *pFS, const STFileSet *pFileSet, ETsdbRepairMode mode,
×
1494
                                                    TFileOpArray *opArr) {
1495
  int32_t code = 0;
×
1496

1497
  code = tsdbDeepScanAndFixDataPart(pFS, pFileSet, mode, opArr);
×
1498
  if (code != 0) {
×
1499
    return code;
×
1500
  }
1501

1502
  return tsdbDeepScanAndFixSttPart(pFS, pFileSet, opArr);
×
1503
}
1504

1505
static int32_t tsdbForceRepairFileSet(STFileSystem *pFS, const STFileSet *pFileSet, TFileOpArray *opArr,
×
1506
                                      bool *hasChange) {
1507
  int32_t           code = 0;
×
1508
  int32_t           vgId = TD_VID(pFS->tsdb->pVnode);
×
1509
  EDmRepairStrategy strategy = tsdbRepairGetStrategyForFileSet(vgId, pFileSet->fid);
×
1510
  ETsdbRepairMode   mode = (ETsdbRepairMode)tsdbRepairResolveMode(strategy);
×
1511
  TFileOpArray      fileSetOps[1];
×
1512

1513
  TARRAY2_INIT(fileSetOps);
×
1514

1515
  tsdbInfo("vgId:%d fid:%d start force repair for fileset, strategy:%s mode:%s", vgId, pFileSet->fid,
×
1516
           tsdbRepairStrategyName(strategy), tsdbRepairModeName(mode));
1517

1518
  code = tsdbForceRepairFileSetBadFiles(pFS, pFileSet, fileSetOps);
×
1519
  if (code != 0) {
×
1520
    tsdbError("vgId:%d fid:%d failed during bad-file detection since %s, code:%d", vgId, pFileSet->fid, tstrerror(code),
×
1521
              code);
1522
    goto _exit;
×
1523
  }
1524

1525
  if (mode != TSDB_REPAIR_MODE_DROP_INVALID_ONLY) {
×
1526
    code = tsdbForceRepairFileSetDeepScanAndFix(pFS, pFileSet, mode, fileSetOps);
×
1527
    if (code != 0) {
×
1528
      tsdbError("vgId:%d fid:%d failed during deep scan/apply since %s, code:%d", vgId, pFileSet->fid, tstrerror(code),
×
1529
                code);
1530
      goto _exit;
×
1531
    }
1532
  } else {
1533
    tsdbDebug("vgId:%d fid:%d skip deep scan because strategy:%s only drops invalid files", vgId, pFileSet->fid,
×
1534
              tsdbRepairStrategyName(strategy));
1535
  }
1536

1537
  if (TARRAY2_SIZE(fileSetOps) == 0) {
×
1538
    tsdbInfo("vgId:%d fid:%d fileset repair finished with no change", vgId, pFileSet->fid);
×
1539
    goto _exit;
×
1540
  }
1541

1542
  code = tsdbRepairAppendOps(opArr, fileSetOps);
×
1543
  if (code != 0) {
×
1544
    tsdbError("vgId:%d fid:%d failed to append fileset ops since %s, code:%d", vgId, pFileSet->fid, tstrerror(code),
×
1545
              code);
1546
    goto _exit;
×
1547
  }
1548

1549
  *hasChange = true;
×
1550
  tsdbInfo("vgId:%d fid:%d fileset repair appended %d ops", vgId, pFileSet->fid, TARRAY2_SIZE(fileSetOps));
×
1551

1552
_exit:
×
1553
  TARRAY2_DESTROY(fileSetOps, NULL);
×
1554
  return code;
×
1555
}
1556

1557
static int32_t tsdbForceRepairCommitChange(STFileSystem *pFS, const TFileOpArray *opArr) {
×
1558
  int32_t code = 0;
×
1559
  STsdb  *pTsdb = pFS->tsdb;
×
1560

1561
  if (TARRAY2_SIZE(opArr) == 0) {
×
1562
    tsdbInfo("vgId:%d skip repair commit because there are no file ops", TD_VID(pTsdb->pVnode));
×
1563
    return 0;
×
1564
  }
1565

1566
  tsdbInfo("vgId:%d commit %d repair file ops", TD_VID(pTsdb->pVnode), TARRAY2_SIZE(opArr));
×
1567

1568
  code = tsdbFSEditBegin(pFS, opArr, TSDB_FEDIT_FORCE_REPAIR);
×
1569
  if (code != 0) {
×
1570
    tsdbError("vgId:%d failed to begin repair fs edit since %s, code:%d", TD_VID(pTsdb->pVnode), tstrerror(code), code);
×
1571
    return code;
×
1572
  }
1573

1574
  (void)taosThreadMutexLock(&pTsdb->mutex);
×
1575
  code = tsdbFSEditCommit(pFS);
×
1576
  if (code != 0) {
×
1577
    int32_t abortCode = 0;
×
1578

1579
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
1580
    tsdbError("vgId:%d failed to commit repair fs edit since %s, code:%d", TD_VID(pTsdb->pVnode), tstrerror(code),
×
1581
              code);
1582

1583
    abortCode = tsdbFSEditAbort(pFS);
×
1584
    if (abortCode != 0) {
×
1585
      tsdbError("vgId:%d failed to abort repair fs edit after commit failure since %s, code:%d", TD_VID(pTsdb->pVnode),
×
1586
                tstrerror(abortCode), abortCode);
1587
    } else {
1588
      tsdbWarn("vgId:%d aborted repair fs edit after commit failure", TD_VID(pTsdb->pVnode));
×
1589
    }
1590
    return code;
×
1591
  }
1592
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
1593

1594
  tsdbInfo("vgId:%d committed repair fs edit successfully", TD_VID(pTsdb->pVnode));
×
1595
  return 0;
×
1596
}
1597

1598
int32_t tsdbForceRepair(STFileSystem *fs) {
×
1599
  int32_t      code = 0;
×
1600
  int32_t      vgId = TD_VID(fs->tsdb->pVnode);
×
1601
  bool         hasChange = false;
×
1602
  TFileOpArray opArr[1];
×
1603

1604
  TARRAY2_INIT(opArr);
×
1605

1606
  tsdbInfo("vgId:%d start force repair for %d filesets", vgId, TARRAY2_SIZE(fs->fSetArr));
×
1607

1608
  STFileSet *pFileSet = NULL;
×
1609
  TARRAY2_FOREACH(fs->fSetArr, pFileSet) {
×
1610
    if (!tsdbRepairShouldProcessFileSet(vgId, pFileSet->fid)) {
×
1611
      tsdbDebug("vgId:%d fid:%d skip fileset because it is not part of the explicit repair target set", vgId,
×
1612
                pFileSet->fid);
1613
      continue;
×
1614
    }
1615

1616
    code = tsdbForceRepairFileSet(fs, pFileSet, opArr, &hasChange);
×
1617
    if (code != 0) {
×
1618
      tsdbError("vgId:%d failed to force repair fileset, fid:%d since %s, code:%d", vgId, pFileSet->fid,
×
1619
                tstrerror(code), code);
1620
      goto _exit;
×
1621
    }
1622
  }
1623

1624
  if (hasChange) {
×
1625
    code = tsdbForceRepairCommitChange(fs, opArr);
×
1626
    if (code != 0) {
×
1627
      goto _exit;
×
1628
    }
1629
    tsdbInfo("vgId:%d force repair completed with %d file ops", vgId, TARRAY2_SIZE(opArr));
×
1630
  } else {
1631
    tsdbInfo("vgId:%d force repair completed with no file changes", vgId);
×
1632
  }
1633

1634
_exit:
×
1635
  TARRAY2_DESTROY(opArr, NULL);
×
1636
  if (code != 0) {
×
1637
    return code;
×
1638
  }
1639
  return 0;
×
1640
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc