• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10018

07 Sep 2023 05:00AM UTC coverage: 47.717% (+0.03%) from 47.691%
#10018

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074) (#11075)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>
(cherry picked from commit ac0dd9d31)

1 of 1 new or added line in 1 file covered. (100.0%)

80262 of 168204 relevant lines covered (47.72%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.48
/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *      http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.rpc;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
25
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
26
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
27

28
import java.lang.reflect.Proxy;
29
import java.text.SimpleDateFormat;
30
import java.time.Instant;
31
import java.time.ZoneId;
32
import java.time.ZonedDateTime;
33
import java.time.format.DateTimeFormatter;
34
import java.util.HashMap;
35
import java.util.List;
36
import java.util.Map;
37
import java.util.stream.Collectors;
38

39
public class RpcUtils {
40

41
  /** How big should the default read and write buffers be? Defaults to 1KB */
42
  public static final int THRIFT_DEFAULT_BUF_CAPACITY = 1024;
43
  /**
44
   * It is used to prevent the size of the parsing package from being too large and allocating the
45
   * buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
46
   * reading. Thrift max frame size (16384000 bytes by default), we change it to 512MB.
47
   */
48
  public static final int THRIFT_FRAME_MAX_SIZE = 536870912;
49

50
  /**
51
   * if resizeIfNecessary is called continuously with a small size for more than
52
   * MAX_BUFFER_OVERSIZE_TIME times, we will shrink the buffer to reclaim space.
53
   */
54
  public static final int MAX_BUFFER_OVERSIZE_TIME = 5;
55

56
  public static final long MIN_SHRINK_INTERVAL = 60_000L;
57

58
  private RpcUtils() {
59
    // util class
60
  }
61

62
  public static final TSStatus SUCCESS_STATUS =
1✔
63
      new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
1✔
64

65
  public static IClientRPCService.Iface newSynchronizedClient(IClientRPCService.Iface client) {
66
    return (IClientRPCService.Iface)
×
67
        Proxy.newProxyInstance(
×
68
            RpcUtils.class.getClassLoader(),
×
69
            new Class[] {IClientRPCService.Iface.class},
70
            new SynchronizedHandler(client));
71
  }
72

73
  /**
74
   * verify success.
75
   *
76
   * @param status -status
77
   */
78
  public static void verifySuccess(TSStatus status) throws StatementExecutionException {
79
    if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
1✔
80
      verifySuccess(status.getSubStatus());
×
81
      return;
×
82
    }
83
    if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
1✔
84
      return;
×
85
    }
86
    if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
1✔
87
      throw new StatementExecutionException(status);
1✔
88
    }
89
  }
1✔
90

91
  public static void verifySuccessWithRedirection(TSStatus status)
92
      throws StatementExecutionException, RedirectException {
93
    verifySuccess(status);
×
94
    if (status.isSetRedirectNode()) {
×
95
      throw new RedirectException(status.getRedirectNode());
×
96
    }
97
  }
×
98

99
  public static void verifySuccessWithRedirectionForMultiDevices(
100
      TSStatus status, List<String> devices) throws StatementExecutionException, RedirectException {
101
    verifySuccess(status);
×
102
    if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
×
103
        || status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
104
      Map<String, TEndPoint> deviceEndPointMap = new HashMap<>();
×
105
      List<TSStatus> statusSubStatus = status.getSubStatus();
×
106
      for (int i = 0; i < statusSubStatus.size(); i++) {
×
107
        TSStatus subStatus = statusSubStatus.get(i);
×
108
        if (subStatus.isSetRedirectNode()) {
×
109
          deviceEndPointMap.put(devices.get(i), subStatus.getRedirectNode());
×
110
        }
111
      }
112
      throw new RedirectException(deviceEndPointMap);
×
113
    }
114
  }
×
115

116
  public static void verifySuccess(List<TSStatus> statuses) throws BatchExecutionException {
117
    StringBuilder errMsgs =
×
118
        new StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(": ");
×
119
    for (TSStatus status : statuses) {
×
120
      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
121
          && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
122
        errMsgs.append(status.getMessage()).append("; ");
×
123
      }
124
    }
×
125
    if (errMsgs.length() > 0) {
×
126
      throw new BatchExecutionException(statuses, errMsgs.toString());
×
127
    }
128
  }
×
129

130
  /** convert from TSStatusCode to TSStatus according to status code and status message */
131
  public static TSStatus getStatus(TSStatusCode tsStatusCode) {
132
    return new TSStatus(tsStatusCode.getStatusCode());
1✔
133
  }
134

135
  public static TSStatus getStatus(List<TSStatus> statusList) {
136
    TSStatus status = new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode());
1✔
137
    status.setSubStatus(statusList);
1✔
138
    return status;
1✔
139
  }
140

141
  /**
142
   * convert from TSStatusCode to TSStatus, which has message appending with existed status message
143
   *
144
   * @param tsStatusCode status type
145
   * @param message appending message
146
   */
147
  public static TSStatus getStatus(TSStatusCode tsStatusCode, String message) {
148
    TSStatus status = new TSStatus(tsStatusCode.getStatusCode());
1✔
149
    status.setMessage(message);
1✔
150
    return status;
1✔
151
  }
152

153
  public static TSStatus getStatus(int code, String message) {
154
    TSStatus status = new TSStatus(code);
1✔
155
    status.setMessage(message);
1✔
156
    return status;
1✔
157
  }
158

159
  public static TSExecuteStatementResp getTSExecuteStatementResp(TSStatusCode tsStatusCode) {
160
    TSStatus status = getStatus(tsStatusCode);
×
161
    return getTSExecuteStatementResp(status);
×
162
  }
163

164
  public static TSExecuteStatementResp getTSExecuteStatementResp(
165
      TSStatusCode tsStatusCode, String message) {
166
    TSStatus status = getStatus(tsStatusCode, message);
×
167
    return getTSExecuteStatementResp(status);
×
168
  }
169

170
  public static TSExecuteStatementResp getTSExecuteStatementResp(TSStatus status) {
171
    TSExecuteStatementResp resp = new TSExecuteStatementResp();
×
172
    TSStatus tsStatus = new TSStatus(status);
×
173
    resp.setStatus(tsStatus);
×
174
    return resp;
×
175
  }
176

177
  public static TSFetchResultsResp getTSFetchResultsResp(TSStatusCode tsStatusCode) {
178
    TSStatus status = getStatus(tsStatusCode);
×
179
    return getTSFetchResultsResp(status);
×
180
  }
181

182
  public static TSFetchResultsResp getTSFetchResultsResp(
183
      TSStatusCode tsStatusCode, String appendMessage) {
184
    TSStatus status = getStatus(tsStatusCode, appendMessage);
×
185
    return getTSFetchResultsResp(status);
×
186
  }
187

188
  public static TSFetchResultsResp getTSFetchResultsResp(TSStatus status) {
189
    TSFetchResultsResp resp = new TSFetchResultsResp();
×
190
    TSStatus tsStatus = new TSStatus(status);
×
191
    resp.setStatus(tsStatus);
×
192
    return resp;
×
193
  }
194

195
  public static final String DEFAULT_TIME_FORMAT = "default";
196
  public static final String DEFAULT_TIMESTAMP_PRECISION = "ms";
197

198
  public static String setTimeFormat(String newTimeFormat) {
199
    String timeFormat;
200
    switch (newTimeFormat.trim().toLowerCase()) {
×
201
      case "long":
202
      case "number":
203
      case DEFAULT_TIME_FORMAT:
204
      case "iso8601":
205
        timeFormat = newTimeFormat.trim().toLowerCase();
×
206
        break;
×
207
      default:
208
        // use java default SimpleDateFormat to check whether input time format is legal
209
        // if illegal, it will throw an exception
210
        new SimpleDateFormat(newTimeFormat.trim());
×
211
        timeFormat = newTimeFormat;
×
212
        break;
213
    }
214
    return timeFormat;
×
215
  }
216

217
  public static String formatDatetime(
218
      String timeFormat, String timePrecision, long timestamp, ZoneId zoneId) {
219
    ZonedDateTime dateTime;
220
    switch (timeFormat) {
×
221
      case "long":
222
      case "number":
223
        return Long.toString(timestamp);
×
224
      case DEFAULT_TIME_FORMAT:
225
      case "iso8601":
226
        return parseLongToDateWithPrecision(
×
227
            DateTimeFormatter.ISO_OFFSET_DATE_TIME, timestamp, zoneId, timePrecision);
228
      default:
229
        dateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zoneId);
×
230
        return dateTime.format(DateTimeFormatter.ofPattern(timeFormat));
×
231
    }
232
  }
233

234
  public static String formatDatetimeStr(String datetime, StringBuilder digits) {
235
    if (datetime.contains("+")) {
1✔
236
      String timeZoneStr = datetime.substring(datetime.length() - 6);
1✔
237
      return datetime.substring(0, datetime.length() - 6) + "." + digits + timeZoneStr;
1✔
238
    } else if (datetime.contains("Z")) {
1✔
239
      String timeZoneStr = datetime.substring(datetime.length() - 1);
1✔
240
      return datetime.substring(0, datetime.length() - 1) + "." + digits + timeZoneStr;
1✔
241
    } else {
242
      String timeZoneStr = "";
×
243
      return datetime + "." + digits + timeZoneStr;
×
244
    }
245
  }
246

247
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
248
  public static String parseLongToDateWithPrecision(
249
      DateTimeFormatter formatter, long timestamp, ZoneId zoneid, String timestampPrecision) {
250
    long integerOfDate;
251
    StringBuilder digits;
252
    if ("ms".equals(timestampPrecision)) {
1✔
253
      if (timestamp > 0 || timestamp % 1000 == 0) {
1✔
254
        integerOfDate = timestamp / 1000;
1✔
255
        digits = new StringBuilder(Long.toString(timestamp % 1000));
1✔
256
      } else {
257
        integerOfDate = timestamp / 1000 - 1;
1✔
258
        digits = new StringBuilder(Long.toString(1000 + timestamp % 1000));
1✔
259
      }
260
      ZonedDateTime dateTime =
1✔
261
          ZonedDateTime.ofInstant(Instant.ofEpochSecond(integerOfDate), zoneid);
1✔
262
      String datetime = dateTime.format(formatter);
1✔
263
      int length = digits.length();
1✔
264
      if (length != 3) {
1✔
265
        for (int i = 0; i < 3 - length; i++) {
1✔
266
          digits.insert(0, "0");
1✔
267
        }
268
      }
269
      return formatDatetimeStr(datetime, digits);
1✔
270
    } else if ("us".equals(timestampPrecision)) {
1✔
271
      if (timestamp > 0 || timestamp % 1000_000 == 0) {
1✔
272
        integerOfDate = timestamp / 1000_000;
1✔
273
        digits = new StringBuilder(Long.toString(timestamp % 1000_000));
1✔
274
      } else {
275
        integerOfDate = timestamp / 1000_000 - 1;
1✔
276
        digits = new StringBuilder(Long.toString(1000_000 + timestamp % 1000_000));
1✔
277
      }
278
      ZonedDateTime dateTime =
1✔
279
          ZonedDateTime.ofInstant(Instant.ofEpochSecond(integerOfDate), zoneid);
1✔
280
      String datetime = dateTime.format(formatter);
1✔
281
      int length = digits.length();
1✔
282
      if (length != 6) {
1✔
283
        for (int i = 0; i < 6 - length; i++) {
1✔
284
          digits.insert(0, "0");
1✔
285
        }
286
      }
287
      return formatDatetimeStr(datetime, digits);
1✔
288
    } else {
289
      if (timestamp > 0 || timestamp % 1000_000_000L == 0) {
1✔
290
        integerOfDate = timestamp / 1000_000_000L;
1✔
291
        digits = new StringBuilder(Long.toString(timestamp % 1000_000_000L));
1✔
292
      } else {
293
        integerOfDate = timestamp / 1000_000_000L - 1;
1✔
294
        digits = new StringBuilder(Long.toString(1000_000_000L + timestamp % 1000_000_000L));
1✔
295
      }
296
      ZonedDateTime dateTime =
1✔
297
          ZonedDateTime.ofInstant(Instant.ofEpochSecond(integerOfDate), zoneid);
1✔
298
      String datetime = dateTime.format(formatter);
1✔
299
      int length = digits.length();
1✔
300
      if (length != 9) {
1✔
301
        for (int i = 0; i < 9 - length; i++) {
1✔
302
          digits.insert(0, "0");
1✔
303
        }
304
      }
305
      return formatDatetimeStr(datetime, digits);
1✔
306
    }
307
  }
308

309
  public static TSStatus squashResponseStatusList(List<TSStatus> responseStatusList) {
310
    final List<TSStatus> failedStatus =
×
311
        responseStatusList.stream()
×
312
            .filter(status -> status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
313
            .collect(Collectors.toList());
×
314
    return failedStatus.isEmpty()
×
315
        ? new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
316
        : new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
317
            .setMessage(failedStatus.toString());
×
318
  }
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc