• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.commons.conf.IoTDBConstant;
25
import org.apache.iotdb.commons.udf.UDFInformation;
26
import org.apache.iotdb.confignode.client.DataNodeRequestType;
27
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
28
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
29
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
30
import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan;
31
import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan;
32
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
33
import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan;
34
import org.apache.iotdb.confignode.consensus.response.JarResp;
35
import org.apache.iotdb.confignode.consensus.response.function.FunctionTableResp;
36
import org.apache.iotdb.confignode.persistence.UDFInfo;
37
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
38
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
39
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
40
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
41
import org.apache.iotdb.consensus.exception.ConsensusException;
42
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
43
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
44
import org.apache.iotdb.rpc.RpcUtils;
45
import org.apache.iotdb.rpc.TSStatusCode;
46
import org.apache.iotdb.tsfile.utils.Binary;
47

48
import org.slf4j.Logger;
49
import org.slf4j.LoggerFactory;
50

51
import java.io.IOException;
52
import java.util.Collections;
53
import java.util.List;
54
import java.util.Map;
55

56
public class UDFManager {
57

58
  private static final Logger LOGGER = LoggerFactory.getLogger(UDFManager.class);
×
59

60
  private final ConfigManager configManager;
61
  private final UDFInfo udfInfo;
62

63
  private final long planSizeLimit =
×
64
      ConfigNodeDescriptor.getInstance()
×
65
              .getConf()
×
66
              .getConfigNodeRatisConsensusLogAppenderBufferSize()
×
67
          - IoTDBConstant.RAFT_LOG_BASIC_SIZE;
68

69
  public UDFManager(ConfigManager configManager, UDFInfo udfInfo) {
×
70
    this.configManager = configManager;
×
71
    this.udfInfo = udfInfo;
×
72
  }
×
73

74
  public UDFInfo getUdfInfo() {
75
    return udfInfo;
×
76
  }
77

78
  public TSStatus createFunction(TCreateFunctionReq req) {
79
    udfInfo.acquireUDFTableLock();
×
80
    try {
81
      final boolean isUsingURI = req.isIsUsingURI();
×
82
      final String udfName = req.udfName.toUpperCase();
×
83
      final String jarMD5 = req.getJarMD5();
×
84
      final String jarName = req.getJarName();
×
85
      final byte[] jarFile = req.getJarFile();
×
86
      udfInfo.validate(udfName, jarName, jarMD5);
×
87

88
      final UDFInformation udfInformation =
×
89
          new UDFInformation(udfName, req.getClassName(), false, isUsingURI, jarName, jarMD5);
×
90
      final boolean needToSaveJar = isUsingURI && udfInfo.needToSaveJar(jarName);
×
91

92
      LOGGER.info(
×
93
          "Start to create UDF [{}] on Data Nodes, needToSaveJar[{}]", udfName, needToSaveJar);
×
94

95
      final TSStatus dataNodesStatus =
×
96
          RpcUtils.squashResponseStatusList(
×
97
              createFunctionOnDataNodes(udfInformation, needToSaveJar ? jarFile : null));
×
98
      if (dataNodesStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
99
        return dataNodesStatus;
×
100
      }
101

102
      CreateFunctionPlan createFunctionPlan =
×
103
          new CreateFunctionPlan(udfInformation, needToSaveJar ? new Binary(jarFile) : null);
×
104
      if (needToSaveJar && createFunctionPlan.getSerializedSize() > planSizeLimit) {
×
105
        return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
×
106
            .setMessage(
×
107
                String.format(
×
108
                    "Fail to create UDF[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
109
                    udfName));
110
      }
111

112
      LOGGER.info("Start to add UDF [{}] in UDF_Table on Config Nodes", udfName);
×
113

114
      return configManager.getConsensusManager().write(createFunctionPlan);
×
115
    } catch (Exception e) {
×
116
      LOGGER.warn(e.getMessage(), e);
×
117
      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
118
          .setMessage(e.getMessage());
×
119
    } finally {
120
      udfInfo.releaseUDFTableLock();
×
121
    }
122
  }
123

124
  private List<TSStatus> createFunctionOnDataNodes(UDFInformation udfInformation, byte[] jarFile)
125
      throws IOException {
126
    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
127
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
128
    final TCreateFunctionInstanceReq req =
×
129
        new TCreateFunctionInstanceReq(udfInformation.serialize()).setJarFile(jarFile);
×
130
    AsyncClientHandler<TCreateFunctionInstanceReq, TSStatus> clientHandler =
×
131
        new AsyncClientHandler<>(DataNodeRequestType.CREATE_FUNCTION, req, dataNodeLocationMap);
132
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
133
    return clientHandler.getResponseList();
×
134
  }
135

136
  public TSStatus dropFunction(String functionName) {
137
    functionName = functionName.toUpperCase();
×
138
    udfInfo.acquireUDFTableLock();
×
139
    try {
140
      udfInfo.validate(functionName);
×
141

142
      TSStatus result = RpcUtils.squashResponseStatusList(dropFunctionOnDataNodes(functionName));
×
143
      if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
144
        return result;
×
145
      }
146

147
      return configManager.getConsensusManager().write(new DropFunctionPlan(functionName));
×
148
    } catch (Exception e) {
×
149
      LOGGER.warn(e.getMessage(), e);
×
150
      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
151
          .setMessage(e.getMessage());
×
152
    } finally {
153
      udfInfo.releaseUDFTableLock();
×
154
    }
155
  }
156

157
  private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
158
    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
159
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
160

161
    final TDropFunctionInstanceReq request = new TDropFunctionInstanceReq(functionName, false);
×
162

163
    AsyncClientHandler<TDropFunctionInstanceReq, TSStatus> clientHandler =
×
164
        new AsyncClientHandler<>(DataNodeRequestType.DROP_FUNCTION, request, dataNodeLocationMap);
165
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
166
    return clientHandler.getResponseList();
×
167
  }
168

169
  public TGetUDFTableResp getUDFTable() {
170
    try {
171
      return ((FunctionTableResp)
×
172
              configManager.getConsensusManager().read(new GetFunctionTablePlan()))
×
173
          .convertToThriftResponse();
×
174
    } catch (IOException | ConsensusException e) {
×
175
      LOGGER.error("Fail to get TriggerTable", e);
×
176
      return new TGetUDFTableResp(
×
177
          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
178
              .setMessage(e.getMessage()),
×
179
          Collections.emptyList());
×
180
    }
181
  }
182

183
  public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
184
    try {
185
      return ((JarResp)
×
186
              configManager.getConsensusManager().read(new GetUDFJarPlan(req.getJarNameList())))
×
187
          .convertToThriftResponse();
×
188
    } catch (ConsensusException e) {
×
189
      LOGGER.warn("Failed in the read API executing the consensus layer due to: ", e);
×
190
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
191
      res.setMessage(e.getMessage());
×
192
      return new JarResp(res, Collections.emptyList()).convertToThriftResponse();
×
193
    }
194
  }
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc