• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10013

06 Sep 2023 12:26PM CUT coverage: 47.691% (+0.04%) from 47.654%
#10013

push

travis_ci

web-flow
[To rel/1.2] Change display nums to fixed 1000 (#11039)

1 of 1 new or added line in 1 file covered. (100.0%)

80214 of 168196 relevant lines covered (47.69%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.43
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureStore.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.procedure.store;
21

22
import org.apache.iotdb.commons.conf.CommonDescriptor;
23
import org.apache.iotdb.commons.utils.TestOnly;
24
import org.apache.iotdb.confignode.procedure.Procedure;
25

26
import org.apache.commons.io.FileUtils;
27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29

30
import java.io.File;
31
import java.io.IOException;
32
import java.nio.file.Files;
33
import java.nio.file.Path;
34
import java.nio.file.Paths;
35
import java.util.List;
36
import java.util.concurrent.ConcurrentHashMap;
37
import java.util.stream.Stream;
38

39
public class ProcedureStore implements IProcedureStore {
40

41
  private static final Logger LOG = LoggerFactory.getLogger(ProcedureStore.class);
1✔
42
  private String procedureWalDir =
1✔
43
      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
1✔
44
  private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new ConcurrentHashMap<>();
1✔
45
  public static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
46
  private final IProcedureFactory procedureFactory;
47
  private volatile boolean isRunning = false;
1✔
48

49
  public ProcedureStore(IProcedureFactory procedureFactory) {
×
50
    try {
51
      this.procedureFactory = procedureFactory;
×
52
      Files.createDirectories(Paths.get(procedureWalDir));
×
53
    } catch (IOException e) {
×
54
      throw new RuntimeException("Create procedure wal directory failed.", e);
×
55
    }
×
56
  }
×
57

58
  @TestOnly
59
  public ProcedureStore(String testWALDir, IProcedureFactory procedureFactory) {
1✔
60
    this.procedureFactory = procedureFactory;
1✔
61
    try {
62
      Files.createDirectories(Paths.get(testWALDir));
1✔
63
      procedureWalDir = testWALDir;
1✔
64
    } catch (IOException e) {
×
65
      throw new RuntimeException("Create procedure wal directory failed.", e);
×
66
    }
1✔
67
  }
1✔
68

69
  public boolean isRunning() {
70
    return this.isRunning;
1✔
71
  }
72

73
  public void setRunning(boolean running) {
74
    isRunning = running;
×
75
  }
×
76

77
  /**
78
   * Load procedure wal files into memory.
79
   *
80
   * @param procedureList procedureList
81
   */
82
  public void load(List<Procedure> procedureList) {
83
    try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
1✔
84
      s.filter(path -> path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
1✔
85
          .sorted(
1✔
86
              (p1, p2) ->
87
                  Long.compareUnsigned(
1✔
88
                      Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
1✔
89
                      Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
1✔
90
          .forEach(
1✔
91
              path -> {
92
                String fileName = path.getFileName().toString();
1✔
93
                long procId = Long.parseLong(fileName.split("\\.")[0]);
1✔
94
                ProcedureWAL procedureWAL =
1✔
95
                    procWALMap.computeIfAbsent(
1✔
96
                        procId, id -> new ProcedureWAL(path, procedureFactory));
1✔
97
                procedureWAL.load(procedureList);
1✔
98
              });
1✔
99
    } catch (IOException e) {
×
100
      LOG.error("Load procedure wal failed.", e);
×
101
    }
1✔
102
  }
1✔
103

104
  /**
105
   * Update procedure, roughly delete and create a new wal file.
106
   *
107
   * @param procedure procedure
108
   */
109
  public void update(Procedure procedure) {
110
    if (!procedure.needPersistance()) {
1✔
111
      procWALMap.remove(procedure.getProcId());
×
112
      return;
×
113
    }
114
    long procId = procedure.getProcId();
1✔
115
    Path path = Paths.get(procedureWalDir, procId + ProcedureStore.PROCEDURE_WAL_SUFFIX);
1✔
116
    ProcedureWAL procedureWAL =
1✔
117
        procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path, procedureFactory));
1✔
118
    try {
119
      procedureWAL.save(procedure);
1✔
120
    } catch (IOException e) {
×
121
      LOG.error("Update Procedure (pid={}) wal failed", procedure.getProcId());
×
122
    }
1✔
123
  }
1✔
124

125
  /**
126
   * Batch update
127
   *
128
   * @param subprocs procedure array
129
   */
130
  public void update(Procedure[] subprocs) {
131
    for (Procedure subproc : subprocs) {
1✔
132
      update(subproc);
1✔
133
    }
134
  }
1✔
135

136
  /**
137
   * Delete procedure wal file
138
   *
139
   * @param procId procedure id
140
   */
141
  public void delete(long procId) {
142
    ProcedureWAL procedureWAL = procWALMap.get(procId);
1✔
143
    if (procedureWAL != null) {
1✔
144
      procedureWAL.delete();
1✔
145
    }
146
    procWALMap.remove(procId);
1✔
147
  }
1✔
148

149
  /**
150
   * Batch delete
151
   *
152
   * @param childProcIds procedure id array
153
   */
154
  public void delete(long[] childProcIds) {
155
    for (long childProcId : childProcIds) {
1✔
156
      delete(childProcId);
1✔
157
    }
158
  }
1✔
159

160
  /**
161
   * Batch delete by index
162
   *
163
   * @param batchIds batchIds
164
   * @param startIndex start index
165
   * @param batchCount delete procedure count
166
   */
167
  public void delete(long[] batchIds, int startIndex, int batchCount) {
168
    for (int i = startIndex; i < batchCount; i++) {
×
169
      delete(batchIds[i]);
×
170
    }
171
  }
×
172

173
  /** clean all the wal, used for unit test. */
174
  public void cleanup() {
175
    try {
176
      FileUtils.cleanDirectory(new File(procedureWalDir));
1✔
177
    } catch (IOException e) {
×
178
      LOG.error("Clean wal directory failed", e);
×
179
    }
1✔
180
  }
1✔
181

182
  public void stop() {
183
    isRunning = false;
1✔
184
  }
1✔
185

186
  @Override
187
  public void start() {
188
    if (!isRunning) {
1✔
189
      isRunning = true;
1✔
190
    }
191
  }
1✔
192
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc