• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9950

29 Aug 2023 06:53AM UTC coverage: 47.678% (-0.05%) from 47.724%
#9950

push

travis_ci

web-flow
[To rel/1.2] Feature add insertRecords method to rest service (#10838)

Co-authored-by: Cloudwise_Luke <282583553@qq.com>

177 of 177 new or added lines in 7 files covered. (100.0%)

80115 of 168032 relevant lines covered (47.68%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package org.apache.iotdb.db.protocol.rest.v2.impl;
19

20
import org.apache.iotdb.db.conf.IoTDBConfig;
21
import org.apache.iotdb.db.conf.IoTDBDescriptor;
22
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
23
import org.apache.iotdb.db.protocol.rest.handler.AuthorizationHandler;
24
import org.apache.iotdb.db.protocol.rest.utils.InsertTabletSortDataUtils;
25
import org.apache.iotdb.db.protocol.rest.v2.RestApiService;
26
import org.apache.iotdb.db.protocol.rest.v2.handler.ExceptionHandler;
27
import org.apache.iotdb.db.protocol.rest.v2.handler.ExecuteStatementHandler;
28
import org.apache.iotdb.db.protocol.rest.v2.handler.QueryDataSetHandler;
29
import org.apache.iotdb.db.protocol.rest.v2.handler.RequestValidationHandler;
30
import org.apache.iotdb.db.protocol.rest.v2.handler.StatementConstructionHandler;
31
import org.apache.iotdb.db.protocol.rest.v2.model.ExecutionStatus;
32
import org.apache.iotdb.db.protocol.rest.v2.model.InsertRecordsRequest;
33
import org.apache.iotdb.db.protocol.rest.v2.model.InsertTabletRequest;
34
import org.apache.iotdb.db.protocol.rest.v2.model.SQL;
35
import org.apache.iotdb.db.protocol.session.SessionManager;
36
import org.apache.iotdb.db.queryengine.plan.Coordinator;
37
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
38
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
39
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
40
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
41
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
42
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
43
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
44
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
45
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
46
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
47
import org.apache.iotdb.db.utils.SetThreadName;
48
import org.apache.iotdb.rpc.TSStatusCode;
49

50
import javax.ws.rs.core.Response;
51
import javax.ws.rs.core.SecurityContext;
52

53
import java.time.ZoneId;
54

55
public class RestApiServiceImpl extends RestApiService {
56

57
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
×
58

59
  private static final Coordinator COORDINATOR = Coordinator.getInstance();
×
60

61
  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
×
62

63
  private final IPartitionFetcher partitionFetcher;
64

65
  private final ISchemaFetcher schemaFetcher;
66
  private final AuthorizationHandler authorizationHandler;
67

68
  private final Integer defaultQueryRowLimit;
69

70
  public RestApiServiceImpl() {
×
71
    partitionFetcher = ClusterPartitionFetcher.getInstance();
×
72
    schemaFetcher = ClusterSchemaFetcher.getInstance();
×
73
    authorizationHandler = new AuthorizationHandler();
×
74
    defaultQueryRowLimit =
×
75
        IoTDBRestServiceDescriptor.getInstance().getConfig().getRestQueryDefaultRowSizeLimit();
×
76
  }
×
77

78
  @Override
79
  public Response executeNonQueryStatement(SQL sql, SecurityContext securityContext) {
80
    Long queryId = null;
×
81
    try {
82
      RequestValidationHandler.validateSQL(sql);
×
83

84
      Statement statement =
×
85
          StatementGenerator.createStatement(sql.getSql(), ZoneId.systemDefault());
×
86

87
      if (!ExecuteStatementHandler.validateStatement(statement)) {
×
88
        return Response.ok()
×
89
            .entity(
×
90
                new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
91
                    .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
92
                    .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name()))
×
93
            .build();
×
94
      }
95

96
      Response response = authorizationHandler.checkAuthority(securityContext, statement);
×
97
      if (response != null) {
×
98
        return response;
×
99
      }
100
      queryId = SESSION_MANAGER.requestQueryId();
×
101
      ExecutionResult result =
×
102
          COORDINATOR.execute(
×
103
              statement,
104
              queryId,
×
105
              null,
106
              sql.getSql(),
×
107
              partitionFetcher,
108
              schemaFetcher,
109
              config.getQueryTimeoutThreshold());
×
110

111
      return Response.ok()
×
112
          .entity(
×
113
              (result.status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
114
                      || result.status.code == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
×
115
                  ? new ExecutionStatus()
116
                      .code(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
117
                      .message(TSStatusCode.SUCCESS_STATUS.name())
×
118
                  : new ExecutionStatus()
119
                      .code(result.status.getCode())
×
120
                      .message(result.status.getMessage()))
×
121
          .build();
×
122
    } catch (Exception e) {
×
123
      return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
×
124
    } finally {
125
      if (queryId != null) {
×
126
        COORDINATOR.cleanupQueryExecution(queryId);
×
127
      }
128
    }
129
  }
130

131
  @Override
132
  public Response executeQueryStatement(SQL sql, SecurityContext securityContext) {
133
    Long queryId = null;
×
134
    try {
135
      RequestValidationHandler.validateSQL(sql);
×
136

137
      Statement statement =
×
138
          StatementGenerator.createStatement(sql.getSql(), ZoneId.systemDefault());
×
139

140
      if (ExecuteStatementHandler.validateStatement(statement)) {
×
141
        return Response.ok()
×
142
            .entity(
×
143
                new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
144
                    .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
145
                    .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name()))
×
146
            .build();
×
147
      }
148

149
      Response response = authorizationHandler.checkAuthority(securityContext, statement);
×
150
      if (response != null) {
×
151
        return response;
×
152
      }
153

154
      queryId = SESSION_MANAGER.requestQueryId();
×
155
      // create and cache dataset
156
      ExecutionResult result =
×
157
          COORDINATOR.execute(
×
158
              statement,
159
              queryId,
×
160
              null,
161
              sql.getSql(),
×
162
              partitionFetcher,
163
              schemaFetcher,
164
              config.getQueryTimeoutThreshold());
×
165
      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
166
          && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
167
        return Response.ok()
×
168
            .entity(
×
169
                new ExecutionStatus()
170
                    .code(result.status.getCode())
×
171
                    .message(result.status.getMessage()))
×
172
            .build();
×
173
      }
174
      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
×
175
      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
×
176
        return QueryDataSetHandler.fillQueryDataSet(
×
177
            queryExecution,
178
            statement,
179
            sql.getRowLimit() == null ? defaultQueryRowLimit : sql.getRowLimit());
×
180
      }
181
    } catch (Exception e) {
×
182
      return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
×
183
    } finally {
184
      if (queryId != null) {
×
185
        COORDINATOR.cleanupQueryExecution(queryId);
×
186
      }
187
    }
188
  }
189

190
  @Override
191
  public Response insertRecords(
192
      InsertRecordsRequest insertRecordsRequest, SecurityContext securityContext) {
193
    Long queryId = null;
×
194
    try {
195
      RequestValidationHandler.validateInsertRecordsRequest(insertRecordsRequest);
×
196

197
      InsertRowsStatement insertRowsStatement =
×
198
          StatementConstructionHandler.createInsertRowsStatement(insertRecordsRequest);
×
199

200
      Response response = authorizationHandler.checkAuthority(securityContext, insertRowsStatement);
×
201
      if (response != null) {
×
202
        return response;
×
203
      }
204
      queryId = SESSION_MANAGER.requestQueryId();
×
205
      ExecutionResult result =
×
206
          COORDINATOR.execute(
×
207
              insertRowsStatement,
208
              SESSION_MANAGER.requestQueryId(),
×
209
              null,
210
              "",
211
              partitionFetcher,
212
              schemaFetcher,
213
              config.getQueryTimeoutThreshold());
×
214

215
      return Response.ok()
×
216
          .entity(
×
217
              (result.status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
218
                      || result.status.code == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
×
219
                  ? new ExecutionStatus()
220
                      .code(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
221
                      .message(TSStatusCode.SUCCESS_STATUS.name())
×
222
                  : new ExecutionStatus()
223
                      .code(result.status.getCode())
×
224
                      .message(result.status.getMessage()))
×
225
          .build();
×
226
    } catch (Exception e) {
×
227
      return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
×
228
    } finally {
229
      if (queryId != null) {
×
230
        COORDINATOR.cleanupQueryExecution(queryId);
×
231
      }
232
    }
233
  }
234

235
  @Override
236
  public Response insertTablet(
237
      InsertTabletRequest insertTabletRequest, SecurityContext securityContext) {
238
    Long queryId = null;
×
239
    try {
240
      RequestValidationHandler.validateInsertTabletRequest(insertTabletRequest);
×
241

242
      if (!InsertTabletSortDataUtils.checkSorted(insertTabletRequest.getTimestamps())) {
×
243
        int[] index =
×
244
            InsertTabletSortDataUtils.sortTimeStampList(insertTabletRequest.getTimestamps());
×
245
        insertTabletRequest.getTimestamps().sort(Long::compareTo);
×
246
        insertTabletRequest.setValues(
×
247
            InsertTabletSortDataUtils.sortList(
×
248
                insertTabletRequest.getValues(), index, insertTabletRequest.getDataTypes().size()));
×
249
      }
250

251
      InsertTabletStatement insertTabletStatement =
×
252
          StatementConstructionHandler.constructInsertTabletStatement(insertTabletRequest);
×
253

254
      Response response =
×
255
          authorizationHandler.checkAuthority(securityContext, insertTabletStatement);
×
256
      if (response != null) {
×
257
        return response;
×
258
      }
259
      queryId = SESSION_MANAGER.requestQueryId();
×
260
      ExecutionResult result =
×
261
          COORDINATOR.execute(
×
262
              insertTabletStatement,
263
              SESSION_MANAGER.requestQueryId(),
×
264
              null,
265
              "",
266
              partitionFetcher,
267
              schemaFetcher,
268
              config.getQueryTimeoutThreshold());
×
269

270
      return Response.ok()
×
271
          .entity(
×
272
              (result.status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
273
                      || result.status.code == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
×
274
                  ? new ExecutionStatus()
275
                      .code(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
276
                      .message(TSStatusCode.SUCCESS_STATUS.name())
×
277
                  : new ExecutionStatus()
278
                      .code(result.status.getCode())
×
279
                      .message(result.status.getMessage()))
×
280
          .build();
×
281
    } catch (Exception e) {
×
282
      return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
×
283
    } finally {
284
      if (queryId != null) {
×
285
        COORDINATOR.cleanupQueryExecution(queryId);
×
286
      }
287
    }
288
  }
289
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc