• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9336

pending completion
#9336

push

travis_ci

web-flow
Skip broken tsfile when recovering system (#9000)

2 of 2 new or added lines in 2 files covered. (100.0%)

65294 of 97950 relevant lines covered (66.66%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *      http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.db.metadata.logfile;
20

21
import org.apache.iotdb.db.conf.IoTDBConfig;
22
import org.apache.iotdb.db.conf.IoTDBDescriptor;
23
import org.apache.iotdb.db.conf.SystemStatus;
24
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
25
import org.apache.iotdb.db.metadata.mnode.IMNode;
26
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
27
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
28
import org.apache.iotdb.db.metadata.path.PartialPath;
29
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
30
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
31
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
32
import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
33
import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
34
import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
35
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
36
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
37
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
38
import org.apache.iotdb.db.qp.physical.sys.DeactivateTemplatePlan;
39
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
40
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
41
import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
42
import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
43
import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
44
import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
45
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
46
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
47
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
48
import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
49
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
50
import org.apache.iotdb.db.writelog.io.LogWriter;
51
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
52
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
53
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
54
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
55

56
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
58

59
import java.io.File;
60
import java.io.IOException;
61
import java.nio.BufferOverflowException;
62
import java.nio.ByteBuffer;
63
import java.nio.file.Files;
64
import java.util.Collections;
65

66
public class MLogWriter implements AutoCloseable {
67

68
  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
69
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
70

71
  private final File logFile;
72
  private LogWriter logWriter;
73
  private int logNum;
74
  private final ByteBuffer mlogBuffer =
75
      ByteBuffer.allocate(IoTDBDescriptor.getInstance().getConfig().getMlogBufferSize());
76

77
  private static final String LOG_TOO_LARGE_INFO =
78
      "Log cannot fit into buffer, please increase mlog_buffer_size";
79

80
  public MLogWriter(String schemaDir, String logFileName) throws IOException {
81
    File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
82
    if (!metadataDir.exists()) {
83
      if (metadataDir.mkdirs()) {
84
        logger.info("create schema folder {}.", metadataDir);
85
      } else {
86
        logger.warn("create schema folder {} failed.", metadataDir);
87
      }
88
    }
89

90
    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
91
    logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
92
  }
93

94
  public MLogWriter(String logFilePath) throws IOException {
95
    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
96
    logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
97
  }
98

99
  @Override
100
  public void close() throws IOException {
101
    logWriter.close();
102
  }
103

104
  private void sync() {
105
    int retryCnt = 0;
106
    mlogBuffer.mark();
107
    while (true) {
108
      try {
109
        logWriter.write(mlogBuffer);
110
        break;
111
      } catch (IOException e) {
112
        if (retryCnt < 3) {
113
          logger.warn("MLog {} sync failed, retry it again", logFile.getAbsoluteFile(), e);
114
          mlogBuffer.reset();
115
          retryCnt++;
116
        } else {
117
          logger.error(
118
              "MLog {} sync failed, change system mode to error", logFile.getAbsoluteFile(), e);
119
          IoTDBDescriptor.getInstance().getConfig().setSystemStatus(SystemStatus.ERROR);
120
          break;
121
        }
122
      }
123
    }
124
    mlogBuffer.clear();
125
  }
126

127
  synchronized void putLog(PhysicalPlan plan) throws IOException {
128
    try {
129
      plan.serialize(mlogBuffer);
130
      sync();
131
      logNum++;
132
    } catch (BufferOverflowException e) {
133
      throw new IOException(LOG_TOO_LARGE_INFO, e);
134
    }
135
  }
136

137
  public void createTimeseries(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException {
138
    putLog(createTimeSeriesPlan);
139
  }
140

141
  public void createAlignedTimeseries(CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan)
142
      throws IOException {
143
    putLog(createAlignedTimeSeriesPlan);
144
  }
145

146
  public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
147
    putLog(deleteTimeSeriesPlan);
148
  }
149

150
  public void setStorageGroup(PartialPath storageGroup) throws IOException {
151
    SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
152
    putLog(plan);
153
  }
154

155
  public void deleteStorageGroup(PartialPath storageGroup) throws IOException {
156
    DeleteStorageGroupPlan plan =
157
        new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
158
    putLog(plan);
159
  }
160

161
  public void setTTL(PartialPath storageGroup, long ttl) throws IOException {
162
    SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
163
    putLog(plan);
164
  }
165

166
  public void changeOffset(PartialPath path, long offset) throws IOException {
167
    ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
168
    putLog(plan);
169
  }
170

171
  public void changeAlias(PartialPath path, String alias) throws IOException {
172
    ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
173
    putLog(plan);
174
  }
175

176
  public void createSchemaTemplate(CreateTemplatePlan plan) throws IOException {
177
    putLog(plan);
178
  }
179

180
  public void appendSchemaTemplate(AppendTemplatePlan plan) throws IOException {
181
    putLog(plan);
182
  }
183

184
  public void pruneSchemaTemplate(PruneTemplatePlan plan) throws IOException {
185
    putLog(plan);
186
  }
187

188
  public void setSchemaTemplate(SetTemplatePlan plan) throws IOException {
189
    putLog(plan);
190
  }
191

192
  public void unsetSchemaTemplate(UnsetTemplatePlan plan) throws IOException {
193
    putLog(plan);
194
  }
195

196
  public void dropSchemaTemplate(DropTemplatePlan plan) throws IOException {
197
    putLog(plan);
198
  }
199

200
  public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws IOException {
201
    putLog(plan);
202
  }
203

204
  public void serializeMNode(IMNode node) throws IOException {
205
    int childSize = 0;
206
    if (node.getChildren() != null) {
207
      childSize = node.getChildren().size();
208
    }
209
    MNodePlan plan = new MNodePlan(node.getName(), childSize);
210
    putLog(plan);
211
  }
212

213
  public void serializeMeasurementMNode(IMeasurementMNode node) throws IOException {
214
    int childSize = 0;
215
    if (node.getChildren() != null) {
216
      childSize = node.getChildren().size();
217
    }
218
    MeasurementMNodePlan plan =
219
        new MeasurementMNodePlan(
220
            node.getName(), node.getAlias(), node.getOffset(), childSize, node.getSchema());
221
    putLog(plan);
222
  }
223

224
  public void serializeStorageGroupMNode(IStorageGroupMNode node) throws IOException {
225
    int childSize = 0;
226
    if (node.getChildren() != null) {
227
      childSize = node.getChildren().size();
228
    }
229
    StorageGroupMNodePlan plan =
230
        new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
231
    putLog(plan);
232
  }
233

234
  public void setUsingSchemaTemplate(PartialPath path) throws IOException {
235
    ActivateTemplatePlan plan = new ActivateTemplatePlan(path);
236
    putLog(plan);
237
  }
238

239
  public void deactivateSchemaTemplate(DeactivateTemplatePlan plan) throws IOException {
240
    putLog(plan);
241
  }
242

243
  public synchronized void clear() throws IOException {
244
    sync();
245
    logWriter.close();
246
    mlogBuffer.clear();
247
    if (logFile != null && logFile.exists()) {
248
      Files.delete(logFile.toPath());
249
    }
250
    logNum = 0;
251
    logWriter = new LogWriter(logFile, false);
252
  }
253

254
  public synchronized int getLogNum() {
255
    return logNum;
256
  }
257

258
  /** only used for initialize a mlog file writer. */
259
  public synchronized void setLogNum(int number) {
260
    logNum = number;
261
  }
262

263
  public synchronized void force() throws IOException {
264
    logWriter.force();
265
  }
266

267
  public static synchronized PhysicalPlan convertFromString(String str) {
268
    String[] words = str.split(",");
269
    switch (words[0]) {
270
      case "2":
271
        return new MeasurementMNodePlan(
272
            words[1],
273
            "".equals(words[2]) ? null : words[2],
274
            Long.parseLong(words[words.length - 2]),
275
            Integer.parseInt(words[words.length - 1]),
276
            new MeasurementSchema(
277
                words[1],
278
                TSDataType.values()[Integer.parseInt(words[3])],
279
                TSEncoding.values()[Integer.parseInt(words[4])],
280
                CompressionType.deserialize((byte) Integer.parseInt(words[5]))));
281
      case "1":
282
        return new StorageGroupMNodePlan(
283
            words[1], Long.parseLong(words[2]), Integer.parseInt(words[3]));
284
      case "0":
285
        return new MNodePlan(words[1], Integer.parseInt(words[2]));
286
      default:
287
        logger.error("unknown cmd {}", str);
288
    }
289
    return null;
290
  }
291
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc