• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9890

22 Aug 2023 09:07AM UTC coverage: 47.922% (-0.07%) from 47.992%
#9890

push

travis_ci

web-flow
[IOTDB-6114] Pipe: Support multi-cluster data sync (#10868)(#10926)

306 of 306 new or added lines in 33 files covered. (100.0%)

79862 of 166649 relevant lines covered (47.92%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.88
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.trigger.executor;
21

22
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23
import org.apache.iotdb.commons.client.IClientManager;
24
import org.apache.iotdb.commons.client.exception.ClientManagerException;
25
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
26
import org.apache.iotdb.commons.consensus.ConfigRegionId;
27
import org.apache.iotdb.commons.path.PartialPath;
28
import org.apache.iotdb.commons.trigger.TriggerInformation;
29
import org.apache.iotdb.commons.trigger.TriggerTable;
30
import org.apache.iotdb.commons.trigger.exception.TriggerExecutionException;
31
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
32
import org.apache.iotdb.db.conf.IoTDBDescriptor;
33
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
34
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
35
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
36
import org.apache.iotdb.db.queryengine.plan.Coordinator;
37
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
38
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
39
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
40
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
41
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
42
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
43
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
44
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
45
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
46
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
47
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
48
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
49
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
50
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
51
import org.apache.iotdb.tsfile.utils.BitMap;
52
import org.apache.iotdb.tsfile.write.record.Tablet;
53
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
54

55
import org.apache.thrift.TException;
56
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
58

59
import java.io.IOException;
60
import java.util.ArrayList;
61
import java.util.Arrays;
62
import java.util.Collections;
63
import java.util.HashMap;
64
import java.util.List;
65
import java.util.Map;
66
import java.util.stream.Collectors;
67

68
public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEvent> {
1✔
69

70
  private static final Logger LOGGER = LoggerFactory.getLogger(TriggerFireVisitor.class);
1✔
71

72
  private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER =
73
      ConfigNodeClientManager.getInstance();
1✔
74

75
  /**
76
   * How many times should we retry when error occurred during firing a trigger on another datanode.
77
   */
78
  private static final int FIRE_RETRY_NUM =
1✔
79
      IoTDBDescriptor.getInstance().getConfig().getRetryNumToFindStatefulTrigger();
1✔
80

81
  @Override
82
  public TriggerFireResult process(PlanNode node, TriggerEvent context) {
83
    if (TriggerManagementService.getInstance().isTriggerTableEmpty()) {
1✔
84
      return TriggerFireResult.SUCCESS;
×
85
    }
86
    return node.accept(this, context);
1✔
87
  }
88

89
  @Override
90
  public TriggerFireResult visitPlan(PlanNode node, TriggerEvent context) {
91
    return TriggerFireResult.SUCCESS;
×
92
  }
93

94
  @Override
95
  public TriggerFireResult visitInsertRow(InsertRowNode node, TriggerEvent context) {
96
    Map<String, List<String>> triggerNameToMeasurementList =
1✔
97
        constructTriggerNameToMeasurementListMap(node, context);
1✔
98
    // return success if no trigger is found
99
    if (triggerNameToMeasurementList.isEmpty()) {
1✔
100
      return TriggerFireResult.SUCCESS;
×
101
    }
102

103
    MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
1✔
104
    Map<String, Integer> measurementToSchemaIndexMap =
1✔
105
        constructMeasurementToSchemaIndexMap(node.getMeasurements(), measurementSchemas);
1✔
106

107
    Object[] values = node.getValues();
1✔
108
    long time = node.getTime();
1✔
109
    boolean hasFailedTrigger = false;
1✔
110
    for (Map.Entry<String, List<String>> entry : triggerNameToMeasurementList.entrySet()) {
1✔
111
      List<MeasurementSchema> schemas =
1✔
112
          entry.getValue().stream()
1✔
113
              .map(measurement -> measurementSchemas[measurementToSchemaIndexMap.get(measurement)])
1✔
114
              .collect(Collectors.toList());
1✔
115
      // only one row
116
      Tablet tablet = new Tablet(node.getDevicePath().getFullPath(), schemas, 1);
1✔
117
      // add one row
118
      tablet.rowSize++;
1✔
119
      tablet.addTimestamp(0, time);
1✔
120
      for (String measurement : entry.getValue()) {
1✔
121
        tablet.addValue(measurement, 0, values[measurementToSchemaIndexMap.get(measurement)]);
1✔
122
      }
1✔
123
      TriggerFireResult result = fire(entry.getKey(), tablet, context);
1✔
124
      // Terminate if a trigger with pessimistic strategy messes up
125
      if (result.equals(TriggerFireResult.TERMINATION)) {
1✔
126
        return result;
×
127
      }
128
      if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
1✔
129
        hasFailedTrigger = true;
×
130
      }
131
    }
1✔
132
    return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
1✔
133
  }
134

135
  @Override
136
  public TriggerFireResult visitInsertTablet(InsertTabletNode node, TriggerEvent context) {
137
    // group Triggers and measurements
138
    Map<String, List<String>> triggerNameToMeasurementList =
×
139
        constructTriggerNameToMeasurementListMap(node, context);
×
140
    // return success if no trigger is found
141
    if (triggerNameToMeasurementList.isEmpty()) {
×
142
      return TriggerFireResult.SUCCESS;
×
143
    }
144

145
    MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
×
146
    Map<String, Integer> measurementToSchemaIndexMap =
×
147
        constructMeasurementToSchemaIndexMap(node.getMeasurements(), measurementSchemas);
×
148

149
    Object[] columns = node.getColumns();
×
150
    BitMap[] bitMaps = node.getBitMaps();
×
151
    long[] timestamps = node.getTimes();
×
152
    int rowCount = node.getRowCount();
×
153
    boolean hasFailedTrigger = false;
×
154
    for (Map.Entry<String, List<String>> entry : triggerNameToMeasurementList.entrySet()) {
×
155
      Tablet tablet;
156
      if (entry.getValue().size() == measurementSchemas.length) {
×
157
        // all measurements are included
158
        tablet =
×
159
            new Tablet(
160
                node.getDevicePath().getFullPath(),
×
161
                Arrays.asList(measurementSchemas),
×
162
                timestamps,
163
                columns,
164
                bitMaps,
165
                rowCount);
166
      } else {
167
        // choose specified columns
168
        List<MeasurementSchema> schemas =
×
169
            entry.getValue().stream()
×
170
                .map(
×
171
                    measurement -> measurementSchemas[measurementToSchemaIndexMap.get(measurement)])
×
172
                .collect(Collectors.toList());
×
173
        Object[] columnsOfNewTablet =
×
174
            entry.getValue().stream()
×
175
                .map(measurement -> columns[measurementToSchemaIndexMap.get(measurement)])
×
176
                .toArray();
×
177
        BitMap[] bitMapsOfNewTablet = new BitMap[entry.getValue().size()];
×
178
        if (bitMaps != null) {
×
179
          for (int i = 0; i < entry.getValue().size(); i++) {
×
180
            bitMapsOfNewTablet[i] =
×
181
                bitMaps[measurementToSchemaIndexMap.get(entry.getValue().get(i))];
×
182
          }
183
        }
184
        tablet =
×
185
            new Tablet(
186
                node.getDevicePath().getFullPath(),
×
187
                schemas,
188
                timestamps,
189
                columnsOfNewTablet,
190
                bitMapsOfNewTablet,
191
                rowCount);
192
      }
193

194
      TriggerFireResult result = fire(entry.getKey(), tablet, context);
×
195
      // Terminate if a trigger with pessimistic strategy messes up
196
      if (result.equals(TriggerFireResult.TERMINATION)) {
×
197
        return result;
×
198
      }
199
      if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
×
200
        hasFailedTrigger = true;
×
201
      }
202
    }
×
203
    return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
×
204
  }
205

206
  @Override
207
  public TriggerFireResult visitInsertRows(InsertRowsNode node, TriggerEvent context) {
208
    boolean hasFailedTrigger = false;
×
209
    for (InsertRowNode insertRowNode : node.getInsertRowNodeList()) {
×
210
      TriggerFireResult result = visitInsertRow(insertRowNode, context);
×
211
      if (result.equals(TriggerFireResult.TERMINATION)) {
×
212
        return result;
×
213
      }
214
      if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
×
215
        hasFailedTrigger = true;
×
216
      }
217
    }
×
218
    return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
×
219
  }
220

221
  @Override
222
  public TriggerFireResult visitInsertMultiTablets(
223
      InsertMultiTabletsNode node, TriggerEvent context) {
224
    boolean hasFailedTrigger = false;
×
225
    for (InsertTabletNode insertTabletNode : node.getInsertTabletNodeList()) {
×
226
      TriggerFireResult result = visitInsertTablet(insertTabletNode, context);
×
227
      if (result.equals(TriggerFireResult.TERMINATION)) {
×
228
        return result;
×
229
      }
230
      if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
×
231
        hasFailedTrigger = true;
×
232
      }
233
    }
×
234
    return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
×
235
  }
236

237
  @Override
238
  public TriggerFireResult visitInsertRowsOfOneDevice(
239
      InsertRowsOfOneDeviceNode node, TriggerEvent context) {
240
    boolean hasFailedTrigger = false;
×
241
    for (InsertRowNode insertRowNode : node.getInsertRowNodeList()) {
×
242
      TriggerFireResult result = visitInsertRow(insertRowNode, context);
×
243
      if (result.equals(TriggerFireResult.TERMINATION)) {
×
244
        return result;
×
245
      }
246
      if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
×
247
        hasFailedTrigger = true;
×
248
      }
249
    }
×
250
    return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
×
251
  }
252

253
  @Override
254
  public TriggerFireResult visitPipeEnrichedInsert(
255
      PipeEnrichedInsertNode node, TriggerEvent context) {
256
    final InsertNode realInsertNode = node.getInsertNode();
×
257
    if (realInsertNode instanceof InsertRowNode) {
×
258
      return visitInsertRow((InsertRowNode) realInsertNode, context);
×
259
    } else if (realInsertNode instanceof InsertTabletNode) {
×
260
      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
×
261
    } else if (realInsertNode instanceof InsertRowsNode) {
×
262
      return visitInsertRows((InsertRowsNode) realInsertNode, context);
×
263
    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
×
264
      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, context);
×
265
    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
×
266
      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) realInsertNode, context);
×
267
    } else {
268
      return visitPlan(realInsertNode, context);
×
269
    }
270
  }
271

272
  private Map<String, Integer> constructMeasurementToSchemaIndexMap(
273
      String[] measurements, MeasurementSchema[] schemas) {
274
    // The index of measurement and schema is the same now.
275
    // However, in case one day the order changes, we need to construct an index map.
276
    Map<String, Integer> indexMap = new HashMap<>();
1✔
277
    for (int i = 0, n = measurements.length; i < n; i++) {
1✔
278
      if (measurements[i] == null) {
1✔
279
        continue;
×
280
      }
281
      // It is the same now
282
      if (schemas[i] != null && schemas[i].getMeasurementId().equals(measurements[i])) {
1✔
283
        indexMap.put(measurements[i], i);
1✔
284
        continue;
1✔
285
      }
286
      for (int j = 0, m = schemas.length; j < m; j++) {
×
287
        if (schemas[j] != null && schemas[j].getMeasurementId().equals(measurements[i])) {
×
288
          indexMap.put(measurements[i], j);
×
289
          break;
×
290
        }
291
      }
292
    }
293
    return indexMap;
1✔
294
  }
295

296
  private Map<String, List<String>> constructTriggerNameToMeasurementListMap(
297
      InsertNode node, TriggerEvent event) {
298
    PartialPath device = node.getDevicePath();
1✔
299
    List<String> measurements = new ArrayList<>();
1✔
300
    for (String measurement : node.getMeasurements()) {
1✔
301
      if (measurement != null) {
1✔
302
        measurements.add(measurement);
1✔
303
      }
304
    }
305

306
    List<List<String>> triggerNameLists =
307
        TriggerManagementService.getInstance().getMatchedTriggerListForPath(device, measurements);
1✔
308
    boolean isAllEmpty = true;
1✔
309
    for (List<String> triggerNameList : triggerNameLists) {
1✔
310
      if (!triggerNameList.isEmpty()) {
1✔
311
        isAllEmpty = false;
1✔
312
        break;
1✔
313
      }
314
    }
×
315
    if (isAllEmpty) {
1✔
316
      return Collections.emptyMap();
×
317
    }
318

319
    Map<String, List<String>> triggerNameToPaths = new HashMap<>();
1✔
320
    TriggerTable triggerTable = TriggerManagementService.getInstance().getTriggerTable();
1✔
321
    for (int i = 0, n = measurements.size(); i < n; i++) {
1✔
322
      for (String triggerName : triggerNameLists.get(i)) {
1✔
323
        TriggerInformation triggerInformation = triggerTable.getTriggerInformation(triggerName);
1✔
324
        if (triggerInformation.getEvent().equals(event)
1✔
325
            && triggerInformation.getTriggerState().equals(TTriggerState.ACTIVE)) {
1✔
326
          triggerNameToPaths
1✔
327
              .computeIfAbsent(triggerName, k -> new ArrayList<>())
1✔
328
              .add(measurements.get(i));
1✔
329
        }
330
      }
1✔
331
    }
332
    return triggerNameToPaths;
1✔
333
  }
334

335
  private TriggerFireResult fire(String triggerName, Tablet tablet, TriggerEvent event) {
336
    TriggerFireResult result = TriggerFireResult.SUCCESS;
1✔
337
    for (int i = 0; i < FIRE_RETRY_NUM; i++) {
1✔
338
      if (TriggerManagementService.getInstance().needToFireOnAnotherDataNode(triggerName)) {
1✔
339
        TDataNodeLocation tDataNodeLocation =
340
            TriggerManagementService.getInstance()
×
341
                .getDataNodeLocationOfStatefulTrigger(triggerName);
×
342
        try (SyncDataNodeInternalServiceClient client =
343
            Coordinator.getInstance()
×
344
                .getInternalServiceClientManager()
×
345
                .borrowClient(tDataNodeLocation.getInternalEndPoint())) {
×
346
          TFireTriggerReq req = new TFireTriggerReq(triggerName, tablet.serialize(), event.getId());
×
347
          TFireTriggerResp resp = client.fireTrigger(req);
×
348
          if (resp.foundExecutor) {
×
349
            // we successfully found an executor on another data node
350
            return TriggerFireResult.construct(resp.getFireResult());
×
351
          } else {
352
            // update TDataNodeLocation of stateful trigger through config node
353
            if (!updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId())) {
×
354
              // if TDataNodeLocation is still the same, sleep 1s and before the retry
355
              Thread.sleep(4000);
×
356
            }
357
          }
358
        } catch (ClientManagerException | TException e) {
×
359
          // IOException means that we failed to borrow client, possibly because corresponding
360
          // DataNode is down.
361
          // TException means there's a timeout or broken connection.
362
          // We need to update local TriggerTable with the new TDataNodeLocation of the stateful
363
          // trigger.
364
          LOGGER.warn(
×
365
              "Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
366
              triggerName,
367
              tDataNodeLocation.getInternalEndPoint(),
×
368
              e);
369
          // update TDataNodeLocation of stateful trigger through config node
370
          updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId());
×
371
        } catch (InterruptedException e) {
×
372
          LOGGER.warn("{} interrupted when sleep", triggerName);
×
373
          Thread.currentThread().interrupt();
×
374
        } catch (Exception e) {
×
375
          LOGGER.warn(
×
376
              "Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
377
              triggerName,
378
              tDataNodeLocation.getInternalEndPoint(),
×
379
              e);
380
          // do not retry if it is not due to bad network or no executor found
381
          return TriggerManagementService.getInstance()
×
382
                  .getTriggerInformation(triggerName)
×
383
                  .getFailureStrategy()
×
384
                  .equals(FailureStrategy.OPTIMISTIC)
×
385
              ? TriggerFireResult.FAILED_NO_TERMINATION
×
386
              : TriggerFireResult.TERMINATION;
×
387
        }
×
388
      } else {
×
389
        TriggerExecutor executor = TriggerManagementService.getInstance().getExecutor(triggerName);
1✔
390
        if (executor == null) {
1✔
391
          return TriggerManagementService.getInstance()
×
392
                  .getTriggerInformation(triggerName)
×
393
                  .getFailureStrategy()
×
394
                  .equals(FailureStrategy.PESSIMISTIC)
×
395
              ? TriggerFireResult.TERMINATION
×
396
              : TriggerFireResult.FAILED_NO_TERMINATION;
×
397
        }
398
        try {
399
          boolean fireResult = executor.fire(tablet, event);
1✔
400
          if (!fireResult) {
1✔
401
            result =
402
                executor.getFailureStrategy().equals(FailureStrategy.PESSIMISTIC)
×
403
                    ? TriggerFireResult.TERMINATION
×
404
                    : TriggerFireResult.FAILED_NO_TERMINATION;
×
405
          }
406
        } catch (TriggerExecutionException e) {
×
407
          result =
408
              executor.getFailureStrategy().equals(FailureStrategy.PESSIMISTIC)
×
409
                  ? TriggerFireResult.TERMINATION
×
410
                  : TriggerFireResult.FAILED_NO_TERMINATION;
×
411
        }
1✔
412
        return result;
1✔
413
      }
414
    }
415
    return result;
×
416
  }
417

418
  /** Return true if the config node returns a new TDataNodeLocation. */
419
  private boolean updateLocationOfStatefulTrigger(String triggerName, int currentDataNodeId) {
420
    try (ConfigNodeClient configNodeClient =
×
421
        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
422
      TDataNodeLocation newTDataNodeLocation =
×
423
          configNodeClient.getLocationOfStatefulTrigger(triggerName).getDataNodeLocation();
×
424
      if (newTDataNodeLocation != null
×
425
          && currentDataNodeId != newTDataNodeLocation.getDataNodeId()) {
×
426
        // indicates that the location of this stateful trigger has changed
427
        TriggerManagementService.getInstance()
×
428
            .updateLocationOfStatefulTrigger(triggerName, newTDataNodeLocation);
×
429
        return true;
×
430
      }
431
      return false;
×
432
    } catch (ClientManagerException | TException | IOException e) {
×
433
      LOGGER.error(
×
434
          "Failed to update location of stateful trigger({}) through config node. The cause is {}.",
435
          triggerName,
436
          e);
437
      return false;
×
438
    }
439
  }
440
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc