• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10019

07 Sep 2023 04:50AM UTC coverage: 47.489% (-0.2%) from 47.655%
#10019

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

80551 of 169622 relevant lines covered (47.49%)

0.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.common.rpc.thrift.TaskType;
24
import org.apache.iotdb.commons.model.ForecastModeInformation;
25
import org.apache.iotdb.commons.model.ModelInformation;
26
import org.apache.iotdb.confignode.consensus.request.read.model.GetModelInfoPlan;
27
import org.apache.iotdb.confignode.consensus.request.read.model.ShowModelPlan;
28
import org.apache.iotdb.confignode.consensus.request.read.model.ShowTrialPlan;
29
import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan;
30
import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelStatePlan;
31
import org.apache.iotdb.confignode.consensus.response.model.GetModelInfoResp;
32
import org.apache.iotdb.confignode.consensus.response.model.ModelTableResp;
33
import org.apache.iotdb.confignode.consensus.response.model.TrialTableResp;
34
import org.apache.iotdb.confignode.persistence.ModelInfo;
35
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
36
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
37
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq;
38
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoResp;
39
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
40
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
41
import org.apache.iotdb.confignode.rpc.thrift.TShowTrialReq;
42
import org.apache.iotdb.confignode.rpc.thrift.TShowTrialResp;
43
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
44
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
45
import org.apache.iotdb.consensus.common.DataSet;
46
import org.apache.iotdb.consensus.exception.ConsensusException;
47
import org.apache.iotdb.rpc.TSStatusCode;
48
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
49

50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52

53
import java.io.IOException;
54
import java.util.Arrays;
55
import java.util.Collections;
56
import java.util.List;
57
import java.util.Map;
58
import java.util.stream.Collectors;
59

60
import static org.apache.iotdb.commons.model.ForecastModeInformation.DEFAULT_INPUT_LENGTH;
61
import static org.apache.iotdb.commons.model.ForecastModeInformation.DEFAULT_PREDICT_LENGTH;
62
import static org.apache.iotdb.commons.model.ForecastModeInformation.INPUT_LENGTH;
63
import static org.apache.iotdb.commons.model.ForecastModeInformation.INPUT_TYPE_LIST;
64
import static org.apache.iotdb.commons.model.ForecastModeInformation.PREDICT_INDEX_LIST;
65
import static org.apache.iotdb.commons.model.ForecastModeInformation.PREDICT_LENGTH;
66

67
public class ModelManager {
68

69
  private static final Logger LOGGER = LoggerFactory.getLogger(ModelManager.class);
×
70

71
  private final ConfigManager configManager;
72
  private final ModelInfo modelInfo;
73

74
  public ModelManager(ConfigManager configManager, ModelInfo modelInfo) {
×
75
    this.configManager = configManager;
×
76
    this.modelInfo = modelInfo;
×
77
  }
×
78

79
  public ModelInfo getModelInfo() {
80
    return modelInfo;
×
81
  }
82

83
  public TSStatus createModel(TCreateModelReq req) {
84
    TaskType taskType = req.getTaskType();
×
85

86
    Map<String, String> options = req.getOptions();
×
87
    ModelInformation modelInformation;
88
    if (taskType == TaskType.FORECAST) {
×
89
      String inputTypeListStr = options.get(INPUT_TYPE_LIST);
×
90
      List<TSDataType> inputTypeList =
×
91
          Arrays.stream(inputTypeListStr.substring(1, inputTypeListStr.length() - 1).split(","))
×
92
              .map(s -> TSDataType.valueOf(s.toUpperCase().trim()))
×
93
              .collect(Collectors.toList());
×
94

95
      String predictIndexListStr = options.get(PREDICT_INDEX_LIST);
×
96
      List<Integer> predictIndexList =
×
97
          Arrays.stream(
×
98
                  predictIndexListStr.substring(1, predictIndexListStr.length() - 1).split(","))
×
99
              .map(s -> Integer.valueOf(s.trim()))
×
100
              .collect(Collectors.toList());
×
101

102
      modelInformation =
×
103
          new ForecastModeInformation(
104
              req.getModelId(),
×
105
              req.getOptions(),
×
106
              req.getDatasetFetchSQL().replace("\n", "").replace("\t", " "),
×
107
              inputTypeList,
108
              predictIndexList,
109
              Integer.parseInt(options.getOrDefault(INPUT_LENGTH, DEFAULT_INPUT_LENGTH)),
×
110
              Integer.parseInt(options.getOrDefault(PREDICT_LENGTH, DEFAULT_PREDICT_LENGTH)));
×
111
    } else {
×
112
      throw new IllegalArgumentException("Invalid task type: " + taskType);
×
113
    }
114
    return configManager
×
115
        .getProcedureManager()
×
116
        .createModel(modelInformation, req.getHyperparameters());
×
117
  }
118

119
  public TSStatus dropModel(TDropModelReq req) {
120
    return configManager.getProcedureManager().dropModel(req.getModelId());
×
121
  }
122

123
  public TSStatus updateModelInfo(TUpdateModelInfoReq req) {
124
    try {
125
      return configManager.getConsensusManager().write(new UpdateModelInfoPlan(req));
×
126
    } catch (ConsensusException e) {
×
127
      LOGGER.warn(
×
128
          String.format("Unexpected error happened while updating model %s: ", req.getModelId()),
×
129
          e);
130
      // consensus layer related errors
131
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
132
      res.setMessage(e.getMessage());
×
133
      return res;
×
134
    }
135
  }
136

137
  public TSStatus updateModelState(TUpdateModelStateReq req) {
138
    try {
139
      return configManager.getConsensusManager().write(new UpdateModelStatePlan(req));
×
140
    } catch (ConsensusException e) {
×
141
      LOGGER.warn(
×
142
          String.format(
×
143
              "Unexpected error happened while updating state of model %s: ", req.getModelId()),
×
144
          e);
145
      // consensus layer related errors
146
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
147
      res.setMessage(e.getMessage());
×
148
      return res;
×
149
    }
150
  }
151

152
  public TShowModelResp showModel(TShowModelReq req) {
153
    try {
154
      DataSet response = configManager.getConsensusManager().read(new ShowModelPlan(req));
×
155
      return ((ModelTableResp) response).convertToThriftResponse();
×
156
    } catch (ConsensusException e) {
×
157
      LOGGER.warn(
×
158
          String.format("Unexpected error happened while showing model %s: ", req.getModelId()), e);
×
159
      // consensus layer related errors
160
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
161
      res.setMessage(e.getMessage());
×
162
      return new TShowModelResp(res, Collections.emptyList());
×
163
    } catch (IOException e) {
×
164
      LOGGER.warn("Fail to get ModelTable", e);
×
165
      return new TShowModelResp(
×
166
          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
167
              .setMessage(e.getMessage()),
×
168
          Collections.emptyList());
×
169
    }
170
  }
171

172
  public TShowTrialResp showTrial(TShowTrialReq req) {
173
    try {
174
      DataSet response = configManager.getConsensusManager().read(new ShowTrialPlan(req));
×
175
      return ((TrialTableResp) response).convertToThriftResponse();
×
176
    } catch (ConsensusException e) {
×
177
      LOGGER.warn(
×
178
          String.format("Unexpected error happened while showing trial %s: ", req.getModelId()), e);
×
179
      // consensus layer related errors
180
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
181
      res.setMessage(e.getMessage());
×
182
      return new TShowTrialResp(res, Collections.emptyList());
×
183
    } catch (IOException e) {
×
184
      LOGGER.warn("Fail to get TrailTable", e);
×
185
      return new TShowTrialResp(
×
186
          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
187
              .setMessage(e.getMessage()),
×
188
          Collections.emptyList());
×
189
    }
190
  }
191

192
  public TGetModelInfoResp getModelInfo(TGetModelInfoReq req) {
193
    try {
194
      DataSet response = configManager.getConsensusManager().read(new GetModelInfoPlan(req));
×
195
      return ((GetModelInfoResp) response).convertToThriftResponse();
×
196
    } catch (ConsensusException e) {
×
197
      LOGGER.warn("Unexpected error happened while getting model: ", e);
×
198
      // consensus layer related errors
199
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
200
      res.setMessage(e.getMessage());
×
201
      return new TGetModelInfoResp(res);
×
202
    }
203
  }
204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc