• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #194

10 Oct 2023 03:10PM UTC coverage: 77.401% (+0.005%) from 77.396%
#194

push

github-actions

web-flow
Reset lastHandledEventId on speculative WFT (#1881)

3 of 3 new or added lines in 1 file covered. (100.0%)

18697 of 24156 relevant lines covered (77.4%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.25
/temporal-sdk/src/main/java/io/temporal/internal/replay/ServiceWorkflowHistoryIterator.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.protobuf.ByteString;
26
import com.uber.m3.tally.Scope;
27
import io.grpc.Deadline;
28
import io.grpc.Status;
29
import io.grpc.StatusRuntimeException;
30
import io.temporal.api.history.v1.History;
31
import io.temporal.api.history.v1.HistoryEvent;
32
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
33
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
34
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
35
import io.temporal.internal.retryer.GrpcRetryer;
36
import io.temporal.serviceclient.RpcRetryOptions;
37
import io.temporal.serviceclient.WorkflowServiceStubs;
38
import java.time.Duration;
39
import java.util.Iterator;
40
import java.util.NoSuchElementException;
41

42
/** Supports iteration over history while loading new pages through calls to the service. */
43
class ServiceWorkflowHistoryIterator implements WorkflowHistoryIterator {
44

45
  private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
1✔
46
  private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
1✔
47
  public final WorkflowServiceStubs service;
48
  private final String namespace;
49
  private final Scope metricsScope;
50
  private final PollWorkflowTaskQueueResponseOrBuilder task;
51
  private final GrpcRetryer grpcRetryer;
52
  private Deadline deadline;
53
  private Iterator<HistoryEvent> current;
54
  ByteString nextPageToken;
55

56
  ServiceWorkflowHistoryIterator(
57
      WorkflowServiceStubs service,
58
      String namespace,
59
      PollWorkflowTaskQueueResponseOrBuilder task,
60
      Scope metricsScope) {
1✔
61
    this.service = service;
1✔
62
    this.namespace = namespace;
1✔
63
    this.task = task;
1✔
64
    this.metricsScope = metricsScope;
1✔
65
    // TODO Refactor WorkflowHistoryIteratorTest or WorkflowHistoryIterator to remove this check.
66
    //  `service == null` shouldn't be allowed as it's needed for a normal functioning of this
67
    // class.
68
    this.grpcRetryer = service != null ? new GrpcRetryer(service.getServerCapabilities()) : null;
1✔
69
    History history = task.getHistory();
1✔
70
    current = history.getEventsList().iterator();
1✔
71
    nextPageToken = task.getNextPageToken();
1✔
72
  }
1✔
73

74
  // Returns true if more history events are available.
75
  @Override
76
  public boolean hasNext() {
77
    if (current.hasNext()) {
1✔
78
      return true;
1✔
79
    }
80
    while (!nextPageToken.isEmpty()) {
1✔
81
      // Server can return page tokens that point to empty pages.
82
      // We need to verify that page is valid before returning true.
83
      // Otherwise, next() method would throw NoSuchElementException after hasNext() returning
84
      // true.
85
      GetWorkflowExecutionHistoryResponse response = queryWorkflowExecutionHistory();
1✔
86

87
      current = response.getHistory().getEventsList().iterator();
1✔
88
      nextPageToken = response.getNextPageToken();
1✔
89
      // Server can return an empty page, but a valid nextPageToken that contains
90
      // more events.
91
      if (current.hasNext()) {
1✔
92
        return true;
1✔
93
      }
94
    }
1✔
95
    return false;
1✔
96
  }
97

98
  @Override
99
  public HistoryEvent next() {
100
    if (hasNext()) {
1✔
101
      return current.next();
1✔
102
    }
103
    throw new NoSuchElementException();
1✔
104
  }
105

106
  public void initDeadline(Deadline deadline) {
107
    this.deadline = deadline;
1✔
108
  }
1✔
109

110
  GetWorkflowExecutionHistoryResponse queryWorkflowExecutionHistory() {
111
    RpcRetryOptions retryOptions =
112
        RpcRetryOptions.newBuilder()
×
113
            .setInitialInterval(retryServiceOperationInitialInterval)
×
114
            .setMaximumInterval(retryServiceOperationMaxInterval)
×
115
            .validateBuildWithDefaults();
×
116
    GrpcRetryer.GrpcRetryerOptions grpcRetryerOptions =
×
117
        new GrpcRetryer.GrpcRetryerOptions(retryOptions, deadline);
118
    GetWorkflowExecutionHistoryRequest request =
119
        GetWorkflowExecutionHistoryRequest.newBuilder()
×
120
            .setNamespace(namespace)
×
121
            .setExecution(task.getWorkflowExecution())
×
122
            .setNextPageToken(nextPageToken)
×
123
            .build();
×
124
    try {
125
      return grpcRetryer.retryWithResult(
×
126
          () ->
127
              service
128
                  .blockingStub()
×
129
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
130
                  .getWorkflowExecutionHistory(request),
×
131
          grpcRetryerOptions);
132
    } catch (StatusRuntimeException ex) {
×
133
      if (Status.DEADLINE_EXCEEDED.equals(ex.getStatus())) {
×
134
        throw Status.DEADLINE_EXCEEDED
×
135
            .withDescription(
×
136
                "getWorkflowExecutionHistory pagination took longer than workflow task timeout")
137
            .withCause(ex)
×
138
            .asRuntimeException();
×
139
      }
140
      throw ex;
×
141
    }
142
  }
143
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc