• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #174

pending completion
#174

push

github-actions

web-flow
Add schedules API (#1776)

Add schedules API

1143 of 1143 new or added lines in 35 files covered. (100.0%)

18101 of 23284 relevant lines covered (77.74%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.8
/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.client.external;
22

23
import static io.temporal.serviceclient.MetricsTag.HISTORY_LONG_POLL_CALL_OPTIONS_KEY;
24
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
25

26
import com.google.common.util.concurrent.ListenableFuture;
27
import com.uber.m3.tally.Scope;
28
import com.uber.m3.util.ImmutableMap;
29
import io.grpc.Deadline;
30
import io.temporal.api.workflowservice.v1.*;
31
import io.temporal.internal.retryer.GrpcRetryer;
32
import io.temporal.serviceclient.MetricsTag;
33
import io.temporal.serviceclient.RpcRetryOptions;
34
import io.temporal.serviceclient.WorkflowServiceStubs;
35
import io.temporal.serviceclient.rpcretry.DefaultStubLongPollRpcRetryOptions;
36
import java.util.Map;
37
import java.util.concurrent.*;
38
import javax.annotation.Nonnull;
39

40
public final class GenericWorkflowClientImpl implements GenericWorkflowClient {
41

42
  // TODO we need to shutdown this executor
43
  private static final ScheduledExecutorService asyncThrottlerExecutor =
1✔
44
      new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "generic-wf-client-async-throttler"));
1✔
45

46
  private final WorkflowServiceStubs service;
47
  private final Scope metricsScope;
48
  private final GrpcRetryer grpcRetryer;
49
  private final GrpcRetryer.GrpcRetryerOptions grpcRetryerOptions;
50

51
  public GenericWorkflowClientImpl(WorkflowServiceStubs service, Scope metricsScope) {
1✔
52
    this.service = service;
1✔
53
    this.metricsScope = metricsScope;
1✔
54
    RpcRetryOptions rpcRetryOptions =
55
        RpcRetryOptions.newBuilder()
1✔
56
            .buildWithDefaultsFrom(service.getOptions().getRpcRetryOptions());
1✔
57
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
58
    this.grpcRetryerOptions = new GrpcRetryer.GrpcRetryerOptions(rpcRetryOptions, null);
1✔
59
  }
1✔
60

61
  @Override
62
  public StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request) {
63
    Map<String, String> tags =
1✔
64
        new ImmutableMap.Builder<String, String>(2)
65
            .put(MetricsTag.WORKFLOW_TYPE, request.getWorkflowType().getName())
1✔
66
            .put(MetricsTag.TASK_QUEUE, request.getTaskQueue().getName())
1✔
67
            .build();
1✔
68
    Scope scope = metricsScope.tagged(tags);
1✔
69
    return grpcRetryer.retryWithResult(
1✔
70
        () ->
71
            service
72
                .blockingStub()
1✔
73
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, scope)
1✔
74
                .startWorkflowExecution(request),
1✔
75
        grpcRetryerOptions);
76
  }
77

78
  @Override
79
  public void signal(SignalWorkflowExecutionRequest request) {
80
    Map<String, String> tags =
1✔
81
        new ImmutableMap.Builder<String, String>(1)
82
            .put(MetricsTag.SIGNAL_NAME, request.getSignalName())
1✔
83
            .build();
1✔
84
    Scope scope = metricsScope.tagged(tags);
1✔
85
    grpcRetryer.retry(
1✔
86
        () ->
87
            service
88
                .blockingStub()
1✔
89
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, scope)
1✔
90
                .signalWorkflowExecution(request),
1✔
91
        grpcRetryerOptions);
92
  }
1✔
93

94
  @Override
95
  public SignalWithStartWorkflowExecutionResponse signalWithStart(
96
      SignalWithStartWorkflowExecutionRequest request) {
97
    Map<String, String> tags =
1✔
98
        new ImmutableMap.Builder<String, String>(2)
99
            .put(MetricsTag.WORKFLOW_TYPE, request.getWorkflowType().getName())
1✔
100
            .put(MetricsTag.TASK_QUEUE, request.getTaskQueue().getName())
1✔
101
            .put(MetricsTag.SIGNAL_NAME, request.getSignalName())
1✔
102
            .build();
1✔
103
    Scope scope = metricsScope.tagged(tags);
1✔
104

105
    return grpcRetryer.retryWithResult(
1✔
106
        () ->
107
            service
108
                .blockingStub()
1✔
109
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, scope)
1✔
110
                .signalWithStartWorkflowExecution(request),
1✔
111
        grpcRetryerOptions);
112
  }
113

114
  @Override
115
  public void requestCancel(RequestCancelWorkflowExecutionRequest request) {
116
    grpcRetryer.retry(
1✔
117
        () ->
118
            service
119
                .blockingStub()
1✔
120
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
121
                .requestCancelWorkflowExecution(request),
1✔
122
        grpcRetryerOptions);
123
  }
1✔
124

125
  @Override
126
  public void terminate(TerminateWorkflowExecutionRequest request) {
127
    grpcRetryer.retry(
1✔
128
        () ->
129
            service
130
                .blockingStub()
1✔
131
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
132
                .terminateWorkflowExecution(request),
1✔
133
        grpcRetryerOptions);
134
  }
1✔
135

136
  @Override
137
  public GetWorkflowExecutionHistoryResponse longPollHistory(
138
      @Nonnull GetWorkflowExecutionHistoryRequest request, @Nonnull Deadline deadline) {
139
    return grpcRetryer.retryWithResult(
1✔
140
        () ->
141
            service
142
                .blockingStub()
1✔
143
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
144
                .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true)
1✔
145
                .withDeadline(deadline)
1✔
146
                .getWorkflowExecutionHistory(request),
1✔
147
        new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline));
148
  }
149

150
  @Override
151
  public CompletableFuture<GetWorkflowExecutionHistoryResponse> longPollHistoryAsync(
152
      @Nonnull GetWorkflowExecutionHistoryRequest request, @Nonnull Deadline deadline) {
153
    return grpcRetryer.retryWithResultAsync(
1✔
154
        asyncThrottlerExecutor,
155
        () ->
156
            toCompletableFuture(
1✔
157
                service
158
                    .futureStub()
1✔
159
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
160
                    .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true)
1✔
161
                    .withDeadline(deadline)
1✔
162
                    .getWorkflowExecutionHistory(request)),
1✔
163
        new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline));
164
  }
165

166
  @Override
167
  public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
168
      @Nonnull GetWorkflowExecutionHistoryRequest request) {
169
    return grpcRetryer.retryWithResult(
×
170
        () ->
171
            service
172
                .blockingStub()
×
173
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
174
                .getWorkflowExecutionHistory(request),
×
175
        grpcRetryerOptions);
176
  }
177

178
  @Override
179
  public CompletableFuture<GetWorkflowExecutionHistoryResponse> getWorkflowExecutionHistoryAsync(
180
      @Nonnull GetWorkflowExecutionHistoryRequest request) {
181
    return grpcRetryer.retryWithResultAsync(
1✔
182
        asyncThrottlerExecutor,
183
        () ->
184
            toCompletableFuture(
1✔
185
                service
186
                    .futureStub()
1✔
187
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
188
                    .getWorkflowExecutionHistory(request)),
1✔
189
        grpcRetryerOptions);
190
  }
191

192
  @Override
193
  public QueryWorkflowResponse query(QueryWorkflowRequest queryParameters) {
194
    Map<String, String> tags =
1✔
195
        new ImmutableMap.Builder<String, String>(1)
196
            .put(MetricsTag.QUERY_TYPE, queryParameters.getQuery().getQueryType())
1✔
197
            .build();
1✔
198
    Scope scope = metricsScope.tagged(tags);
1✔
199

200
    return grpcRetryer.retryWithResult(
1✔
201
        () ->
202
            service
203
                .blockingStub()
1✔
204
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, scope)
1✔
205
                .queryWorkflow(queryParameters),
1✔
206
        grpcRetryerOptions);
207
  }
208

209
  @Override
210
  public ListWorkflowExecutionsResponse listWorkflowExecutions(
211
      ListWorkflowExecutionsRequest listRequest) {
212
    return grpcRetryer.retryWithResult(
×
213
        () ->
214
            service
215
                .blockingStub()
×
216
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
217
                .listWorkflowExecutions(listRequest),
×
218
        grpcRetryerOptions);
219
  }
220

221
  @Override
222
  public CompletableFuture<ListWorkflowExecutionsResponse> listWorkflowExecutionsAsync(
223
      ListWorkflowExecutionsRequest listRequest) {
224
    return grpcRetryer.retryWithResultAsync(
×
225
        asyncThrottlerExecutor,
226
        () ->
227
            toCompletableFuture(
×
228
                service
229
                    .futureStub()
×
230
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
231
                    .listWorkflowExecutions(listRequest)),
×
232
        grpcRetryerOptions);
233
  }
234

235
  @Override
236
  public CreateScheduleResponse createSchedule(CreateScheduleRequest request) {
237
    return grpcRetryer.retryWithResult(
×
238
        () ->
239
            service
240
                .blockingStub()
×
241
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
242
                .createSchedule(request),
×
243
        grpcRetryerOptions);
244
  }
245

246
  @Override
247
  public CompletableFuture<ListSchedulesResponse> listSchedulesAsync(ListSchedulesRequest request) {
248
    return grpcRetryer.retryWithResultAsync(
×
249
        asyncThrottlerExecutor,
250
        () ->
251
            toCompletableFuture(
×
252
                service
253
                    .futureStub()
×
254
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
255
                    .listSchedules(request)),
×
256
        grpcRetryerOptions);
257
  }
258

259
  @Override
260
  public UpdateScheduleResponse updateSchedule(UpdateScheduleRequest request) {
261
    return grpcRetryer.retryWithResult(
×
262
        () ->
263
            service
264
                .blockingStub()
×
265
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
266
                .updateSchedule(request),
×
267
        grpcRetryerOptions);
268
  }
269

270
  @Override
271
  public PatchScheduleResponse patchSchedule(PatchScheduleRequest request) {
272
    return grpcRetryer.retryWithResult(
×
273
        () ->
274
            service
275
                .blockingStub()
×
276
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
277
                .patchSchedule(request),
×
278
        grpcRetryerOptions);
279
  }
280

281
  @Override
282
  public DeleteScheduleResponse deleteSchedule(DeleteScheduleRequest request) {
283
    return grpcRetryer.retryWithResult(
×
284
        () ->
285
            service
286
                .blockingStub()
×
287
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
288
                .deleteSchedule(request),
×
289
        grpcRetryerOptions);
290
  }
291

292
  @Override
293
  public DescribeScheduleResponse describeSchedule(DescribeScheduleRequest request) {
294
    return grpcRetryer.retryWithResult(
×
295
        () ->
296
            service
297
                .blockingStub()
×
298
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
299
                .describeSchedule(request),
×
300
        grpcRetryerOptions);
301
  }
302

303
  private static <T> CompletableFuture<T> toCompletableFuture(
304
      ListenableFuture<T> listenableFuture) {
305
    CompletableFuture<T> result = new CompletableFuture<>();
1✔
306
    listenableFuture.addListener(
1✔
307
        () -> {
308
          try {
309
            result.complete(listenableFuture.get());
1✔
310
          } catch (ExecutionException e) {
1✔
311
            result.completeExceptionally(e.getCause());
1✔
312
          } catch (Exception e) {
×
313
            result.completeExceptionally(e);
×
314
          }
1✔
315
        },
1✔
316
        ForkJoinPool.commonPool());
1✔
317
    return result;
1✔
318
  }
319

320
  @Override
321
  public UpdateWorkflowExecutionResponse update(UpdateWorkflowExecutionRequest updateParameters) {
322
    Map<String, String> tags =
1✔
323
        new ImmutableMap.Builder<String, String>(1)
324
            .put(MetricsTag.UPDATE_NAME, updateParameters.getRequest().getInput().getName())
1✔
325
            .build();
1✔
326
    Scope scope = metricsScope.tagged(tags);
1✔
327

328
    return grpcRetryer.retryWithResult(
1✔
329
        () ->
330
            service
331
                .blockingStub()
1✔
332
                .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, scope)
1✔
333
                .updateWorkflowExecution(updateParameters),
1✔
334
        grpcRetryerOptions);
335
  }
336

337
  @Override
338
  public CompletableFuture<PollWorkflowExecutionUpdateResponse> pollUpdateAsync(
339
      @Nonnull PollWorkflowExecutionUpdateRequest request, @Nonnull Deadline deadline) {
340
    return grpcRetryer.retryWithResultAsync(
1✔
341
        asyncThrottlerExecutor,
342
        () ->
343
            toCompletableFuture(
1✔
344
                service
345
                    .futureStub()
1✔
346
                    .withDeadline(deadline)
1✔
347
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
348
                    .pollWorkflowExecutionUpdate(request)),
1✔
349
        new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline));
350
  }
351
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc