• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10019

07 Sep 2023 04:50AM UTC coverage: 47.489% (-0.2%) from 47.655%
#10019

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

80551 of 169622 relevant lines covered (47.49%)

0.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ModelInfo.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.persistence;
21

22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.commons.model.ModelInformation;
24
import org.apache.iotdb.commons.model.ModelTable;
25
import org.apache.iotdb.commons.model.TrialInformation;
26
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
27
import org.apache.iotdb.confignode.consensus.request.read.model.GetModelInfoPlan;
28
import org.apache.iotdb.confignode.consensus.request.read.model.ShowModelPlan;
29
import org.apache.iotdb.confignode.consensus.request.read.model.ShowTrialPlan;
30
import org.apache.iotdb.confignode.consensus.request.write.model.CreateModelPlan;
31
import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan;
32
import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan;
33
import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelStatePlan;
34
import org.apache.iotdb.confignode.consensus.response.model.GetModelInfoResp;
35
import org.apache.iotdb.confignode.consensus.response.model.ModelTableResp;
36
import org.apache.iotdb.confignode.consensus.response.model.TrialTableResp;
37
import org.apache.iotdb.rpc.TSStatusCode;
38
import org.apache.iotdb.tsfile.utils.PublicBAOS;
39

40
import org.apache.thrift.TException;
41
import org.slf4j.Logger;
42
import org.slf4j.LoggerFactory;
43

44
import javax.annotation.concurrent.ThreadSafe;
45

46
import java.io.DataOutputStream;
47
import java.io.File;
48
import java.io.FileInputStream;
49
import java.io.FileOutputStream;
50
import java.io.IOException;
51
import java.nio.ByteBuffer;
52
import java.util.concurrent.locks.ReadWriteLock;
53
import java.util.concurrent.locks.ReentrantReadWriteLock;
54

55
@ThreadSafe
56
public class ModelInfo implements SnapshotProcessor {
57

58
  private static final Logger LOGGER = LoggerFactory.getLogger(ModelInfo.class);
×
59

60
  private static final String SNAPSHOT_FILENAME = "model_info.snapshot";
61

62
  private ModelTable modelTable;
63

64
  private final ReadWriteLock modelTableLock = new ReentrantReadWriteLock();
×
65

66
  public ModelInfo() {
×
67
    this.modelTable = new ModelTable();
×
68
  }
×
69

70
  public void acquireModelTableReadLock() {
71
    LOGGER.info("acquire ModelTableReadLock");
×
72
    modelTableLock.readLock().lock();
×
73
  }
×
74

75
  public void releaseModelTableReadLock() {
76
    LOGGER.info("release ModelTableReadLock");
×
77
    modelTableLock.readLock().unlock();
×
78
  }
×
79

80
  public void acquireModelTableWriteLock() {
81
    LOGGER.info("acquire ModelTableWriteLock");
×
82
    modelTableLock.writeLock().lock();
×
83
  }
×
84

85
  public void releaseModelTableWriteLock() {
86
    LOGGER.info("release ModelTableWriteLock");
×
87
    modelTableLock.writeLock().unlock();
×
88
  }
×
89

90
  public TSStatus createModel(CreateModelPlan plan) {
91
    try {
92
      acquireModelTableWriteLock();
×
93
      ModelInformation modelInformation = plan.getModelInformation();
×
94
      if (modelTable.containsModel(modelInformation.getModelId())) {
×
95
        return new TSStatus(TSStatusCode.MODEL_EXIST_ERROR.getStatusCode())
×
96
            .setMessage(
×
97
                String.format(
×
98
                    "model [%s] has already been created.", modelInformation.getModelId()));
×
99
      } else {
100
        modelTable.addModel(modelInformation);
×
101
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
102
      }
103

104
    } catch (Exception e) {
×
105
      final String errorMessage =
×
106
          String.format(
×
107
              "Failed to add model [%s] in ModelTable on Config Nodes, because of %s",
108
              plan.getModelInformation().getModelId(), e);
×
109
      LOGGER.warn(errorMessage, e);
×
110
      return new TSStatus(TSStatusCode.CREATE_MODEL_ERROR.getStatusCode()).setMessage(errorMessage);
×
111
    } finally {
112
      releaseModelTableWriteLock();
×
113
    }
114
  }
115

116
  public TSStatus dropModel(DropModelPlan plan) {
117
    acquireModelTableWriteLock();
×
118
    String modelId = plan.getModelId();
×
119
    TSStatus status;
120
    if (modelTable.containsModel(modelId)) {
×
121
      modelTable.removeModel(modelId);
×
122
      status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
123
    } else {
124
      status =
×
125
          new TSStatus(TSStatusCode.DROP_MODEL_ERROR.getStatusCode())
×
126
              .setMessage(String.format("model [%s] has not been created.", modelId));
×
127
    }
128
    releaseModelTableWriteLock();
×
129
    return status;
×
130
  }
131

132
  public ModelTableResp showModel(ShowModelPlan plan) {
133
    acquireModelTableReadLock();
×
134
    try {
135
      ModelTableResp modelTableResp =
×
136
          new ModelTableResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
×
137
      if (plan.isSetModelId()) {
×
138
        ModelInformation modelInformation = modelTable.getModelInformationById(plan.getModelId());
×
139
        if (modelInformation != null) {
×
140
          modelTableResp.addModelInformation(modelInformation);
×
141
        }
142
      } else {
×
143
        modelTableResp.addModelInformation(modelTable.getAllModelInformation());
×
144
      }
145
      return modelTableResp;
×
146
    } catch (IOException e) {
×
147
      LOGGER.warn("Fail to get ModelTable", e);
×
148
      return new ModelTableResp(
×
149
          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
150
              .setMessage(e.getMessage()));
×
151
    } finally {
152
      releaseModelTableReadLock();
×
153
    }
154
  }
155

156
  public TrialTableResp showTrial(ShowTrialPlan plan) {
157
    acquireModelTableReadLock();
×
158
    try {
159
      String modelId = plan.getModelId();
×
160
      ModelInformation modelInformation = modelTable.getModelInformationById(modelId);
×
161
      if (modelInformation == null) {
×
162
        return new TrialTableResp(
×
163
            new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
164
                .setMessage(
×
165
                    String.format(
×
166
                        "Failed to show trails of model [%s], this model has not been created.",
167
                        modelId)));
168
      }
169

170
      TrialTableResp trialTableResp =
×
171
          new TrialTableResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
×
172
      if (plan.isSetTrialId()) {
×
173
        TrialInformation trialInformation =
×
174
            modelInformation.getTrialInformationById(plan.getTrialId());
×
175
        if (trialInformation != null) {
×
176
          trialTableResp.addTrialInformation(trialInformation);
×
177
        }
178
      } else {
×
179
        trialTableResp.addTrialInformation(modelInformation.getAllTrialInformation());
×
180
      }
181
      return trialTableResp;
×
182
    } catch (IOException e) {
×
183
      LOGGER.warn("Fail to get TrailTable", e);
×
184
      return new TrialTableResp(
×
185
          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
186
              .setMessage(e.getMessage()));
×
187
    } finally {
188
      releaseModelTableReadLock();
×
189
    }
190
  }
191

192
  public GetModelInfoResp getModelInfo(GetModelInfoPlan plan) {
193
    acquireModelTableReadLock();
×
194
    try {
195
      GetModelInfoResp getModelInfoResp =
×
196
          new GetModelInfoResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
×
197
      ModelInformation modelInformation = modelTable.getModelInformationById(plan.getModelId());
×
198
      if (modelInformation != null) {
×
199
        PublicBAOS buffer = new PublicBAOS();
×
200
        DataOutputStream stream = new DataOutputStream(buffer);
×
201
        modelInformation.serialize(stream);
×
202
        getModelInfoResp.setModelInfo(ByteBuffer.wrap(buffer.getBuf(), 0, buffer.size()));
×
203
      }
204
      return getModelInfoResp;
×
205
    } catch (IOException e) {
×
206
      LOGGER.warn("Fail to get model info", e);
×
207
      return new GetModelInfoResp(
×
208
          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
209
              .setMessage(e.getMessage()));
×
210
    } finally {
211
      releaseModelTableReadLock();
×
212
    }
213
  }
214

215
  public TSStatus updateModelInfo(UpdateModelInfoPlan plan) {
216
    acquireModelTableWriteLock();
×
217
    try {
218
      String modelId = plan.getModelId();
×
219
      if (modelTable.containsModel(modelId)) {
×
220
        modelTable.updateModel(modelId, plan.getTrialId(), plan.getModelInfo());
×
221
      }
222
      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
223
    } finally {
224
      releaseModelTableWriteLock();
×
225
    }
226
  }
227

228
  public TSStatus updateModelState(UpdateModelStatePlan plan) {
229
    acquireModelTableWriteLock();
×
230
    try {
231
      String modelId = plan.getModelId();
×
232
      if (modelTable.containsModel(modelId)) {
×
233
        modelTable.updateState(modelId, plan.getState(), plan.getBestTrialId());
×
234
      }
235
      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
236
    } finally {
237
      releaseModelTableWriteLock();
×
238
    }
239
  }
240

241
  @Override
242
  public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
243
    File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
×
244
    if (snapshotFile.exists() && snapshotFile.isFile()) {
×
245
      LOGGER.error(
×
246
          "Failed to take snapshot of ModelInfo, because snapshot file [{}] is already exist.",
247
          snapshotFile.getAbsolutePath());
×
248
      return false;
×
249
    }
250

251
    acquireModelTableReadLock();
×
252
    try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
×
253
      modelTable.serialize(fileOutputStream);
×
254
      return true;
×
255
    } finally {
256
      releaseModelTableReadLock();
×
257
    }
258
  }
259

260
  @Override
261
  public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
262
    File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
×
263
    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
×
264
      LOGGER.error(
×
265
          "Failed to load snapshot of ModelInfo, snapshot file [{}] does not exist.",
266
          snapshotFile.getAbsolutePath());
×
267
      return;
×
268
    }
269
    acquireModelTableWriteLock();
×
270
    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
×
271
      modelTable.clear();
×
272
      modelTable = ModelTable.deserialize(fileInputStream);
×
273
    } finally {
274
      releaseModelTableWriteLock();
×
275
    }
276
  }
×
277
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc