• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *      http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.commons.path.PartialPath;
25
import org.apache.iotdb.commons.path.PathDeserializeUtil;
26
import org.apache.iotdb.commons.trigger.TriggerInformation;
27
import org.apache.iotdb.confignode.client.DataNodeRequestType;
28
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
29
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
30
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTransferringTriggersPlan;
31
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerJarPlan;
32
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLocationPlan;
33
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan;
34
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
35
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
36
import org.apache.iotdb.confignode.consensus.response.JarResp;
37
import org.apache.iotdb.confignode.consensus.response.trigger.TransferringTriggersResp;
38
import org.apache.iotdb.confignode.consensus.response.trigger.TriggerLocationResp;
39
import org.apache.iotdb.confignode.consensus.response.trigger.TriggerTableResp;
40
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
41
import org.apache.iotdb.confignode.manager.node.NodeManager;
42
import org.apache.iotdb.confignode.persistence.TriggerInfo;
43
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
44
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
45
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
46
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
47
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
48
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
49
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
50
import org.apache.iotdb.consensus.exception.ConsensusException;
51
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
52
import org.apache.iotdb.rpc.RpcUtils;
53
import org.apache.iotdb.rpc.TSStatusCode;
54
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
55
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
56
import org.apache.iotdb.trigger.api.enums.TriggerType;
57
import org.apache.iotdb.tsfile.utils.Binary;
58

59
import org.slf4j.Logger;
60
import org.slf4j.LoggerFactory;
61

62
import java.io.IOException;
63
import java.util.Collections;
64
import java.util.List;
65
import java.util.Map;
66
import java.util.Optional;
67

68
public class TriggerManager {
69

70
  private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
×
71

72
  private final ConfigManager configManager;
73
  private final TriggerInfo triggerInfo;
74

75
  public TriggerManager(ConfigManager configManager, TriggerInfo triggerInfo) {
×
76
    this.configManager = configManager;
×
77
    this.triggerInfo = triggerInfo;
×
78
  }
×
79

80
  public TriggerInfo getTriggerInfo() {
81
    return triggerInfo;
×
82
  }
83

84
  /**
85
   * Create a trigger in cluster.
86
   *
87
   * <p>If TriggerType is STATELESS, we should create TriggerInstance on all DataNodes, the
88
   * DataNodeLocation in TriggerInformation will be null.
89
   *
90
   * <p>If TriggerType is STATEFUL, we should create TriggerInstance on the DataNode with the lowest
91
   * load, and DataNodeLocation of this DataNode will be saved.
92
   *
93
   * <p>All DataNodes will add TriggerInformation of this trigger in local TriggerTable.
94
   *
95
   * @param req the createTrigger request
96
   * @return status of create this trigger
97
   */
98
  public TSStatus createTrigger(TCreateTriggerReq req) {
99
    final boolean isStateful = TriggerType.construct(req.getTriggerType()) == TriggerType.STATEFUL;
×
100
    TDataNodeLocation dataNodeLocation = null;
×
101
    if (isStateful) {
×
102
      Optional<TDataNodeLocation> targetDataNode =
×
103
          configManager.getNodeManager().getLowestLoadDataNode();
×
104
      if (targetDataNode.isPresent()) {
×
105
        dataNodeLocation = targetDataNode.get();
×
106
      } else {
107
        return new TSStatus(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
×
108
      }
109
    }
110
    final String triggerName = req.getTriggerName();
×
111
    final boolean isUsingURI = req.isIsUsingURI();
×
112
    final boolean needToSaveJar = isUsingURI && triggerInfo.needToSaveJar(triggerName);
×
113
    TriggerInformation triggerInformation =
×
114
        new TriggerInformation(
115
            (PartialPath) PathDeserializeUtil.deserialize(req.pathPattern),
×
116
            triggerName,
117
            req.getClassName(),
×
118
            isUsingURI,
119
            req.getJarName(),
×
120
            req.getAttributes(),
×
121
            TriggerEvent.construct(req.triggerEvent),
×
122
            TTriggerState.INACTIVE,
123
            isStateful,
124
            dataNodeLocation,
125
            FailureStrategy.construct(req.getFailureStrategy()),
×
126
            req.getJarMD5());
×
127
    return configManager
×
128
        .getProcedureManager()
×
129
        .createTrigger(triggerInformation, needToSaveJar ? new Binary(req.getJarFile()) : null);
×
130
  }
131

132
  public TSStatus dropTrigger(TDropTriggerReq req) {
133
    return configManager.getProcedureManager().dropTrigger(req.getTriggerName());
×
134
  }
135

136
  public TGetTriggerTableResp getTriggerTable(boolean onlyStateful) {
137
    try {
138
      return ((TriggerTableResp)
×
139
              configManager.getConsensusManager().read(new GetTriggerTablePlan(onlyStateful)))
×
140
          .convertToThriftResponse();
×
141
    } catch (IOException | ConsensusException e) {
×
142
      LOGGER.error("Fail to get TriggerTable", e);
×
143
      return new TGetTriggerTableResp(
×
144
          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
145
              .setMessage(e.getMessage()),
×
146
          Collections.emptyList());
×
147
    }
148
  }
149

150
  public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName) {
151
    try {
152
      return ((TriggerLocationResp)
×
153
              configManager.getConsensusManager().read(new GetTriggerLocationPlan(triggerName)))
×
154
          .convertToThriftResponse();
×
155
    } catch (ConsensusException e) {
×
156
      LOGGER.warn("Failed in the read API executing the consensus layer due to: ", e);
×
157
      return new TGetLocationForTriggerResp(
×
158
          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
×
159
              .setMessage(e.getMessage()));
×
160
    }
161
  }
162

163
  public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
164
    try {
165
      return ((JarResp)
×
166
              configManager.getConsensusManager().read(new GetTriggerJarPlan(req.getJarNameList())))
×
167
          .convertToThriftResponse();
×
168
    } catch (ConsensusException e) {
×
169
      LOGGER.warn("Failed in the read API executing the consensus layer due to: ", e);
×
170
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
171
      res.setMessage(e.getMessage());
×
172
      return new JarResp(res, Collections.emptyList()).convertToThriftResponse();
×
173
    }
174
  }
175

176
  /**
177
   * Step1: Mark Stateful Triggers on UnknownDataNodes as {@link TTriggerState#TRANSFERRING}.
178
   *
179
   * <p>Step2: Get all Transferring Triggers marked in Step1.
180
   *
181
   * <p>Step3: For each trigger gotten in Step2, find the DataNode with the lowest load, then
182
   * transfer the Stateful Trigger to it and update this information on all DataNodes.
183
   *
184
   * <p>Step4: Update the newest location on ConfigNodes.
185
   *
186
   * @param dataNodeLocationMap The DataNodes with {@link
187
   *     org.apache.iotdb.commons.cluster.NodeStatus#Running} State
188
   * @return result of transferTrigger
189
   */
190
  public TSStatus transferTrigger(
191
      List<TDataNodeLocation> newUnknownDataNodeList,
192
      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
193
    TSStatus transferResult;
194
    triggerInfo.acquireTriggerTableLock();
×
195
    try {
196
      ConsensusManager consensusManager = configManager.getConsensusManager();
×
197
      NodeManager nodeManager = configManager.getNodeManager();
×
198

199
      transferResult =
×
200
          consensusManager.write(new UpdateTriggersOnTransferNodesPlan(newUnknownDataNodeList));
×
201
      if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
202
        return transferResult;
×
203
      }
204

205
      List<String> transferringTriggers =
×
206
          ((TransferringTriggersResp) consensusManager.read(new GetTransferringTriggersPlan()))
×
207
              .getTransferringTriggers();
×
208

209
      for (String trigger : transferringTriggers) {
×
210
        TDataNodeLocation newDataNodeLocation =
×
211
            nodeManager.getLowestLoadDataNode(dataNodeLocationMap.keySet());
×
212

213
        transferResult =
×
214
            RpcUtils.squashResponseStatusList(
×
215
                updateTriggerLocation(trigger, newDataNodeLocation, dataNodeLocationMap));
×
216
        if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
217
          return transferResult;
×
218
        }
219

220
        transferResult =
×
221
            consensusManager.write(new UpdateTriggerLocationPlan(trigger, newDataNodeLocation));
×
222
        if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
223
          return transferResult;
×
224
        }
225
      }
×
226
    } catch (ConsensusException e) {
×
227
      LOGGER.warn("Failed in the read/write API executing the consensus layer due to: ", e);
×
228
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
229
      res.setMessage(e.getMessage());
×
230
      return res;
×
231
    } finally {
232
      triggerInfo.releaseTriggerTableLock();
×
233
    }
234

235
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
236
  }
237

238
  public List<TSStatus> updateTriggerLocation(
239
      String triggerName,
240
      TDataNodeLocation dataNodeLocation,
241
      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
242
    final TUpdateTriggerLocationReq request =
×
243
        new TUpdateTriggerLocationReq(triggerName, dataNodeLocation);
244

245
    AsyncClientHandler<TUpdateTriggerLocationReq, TSStatus> clientHandler =
×
246
        new AsyncClientHandler<>(
247
            DataNodeRequestType.UPDATE_TRIGGER_LOCATION, request, dataNodeLocationMap);
248
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
249
    return clientHandler.getResponseList();
×
250
  }
251
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc