• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #175

pending completion
#175

push

github-actions

web-flow
Worker / Build Id versioning (#1786)

Implement new worker build id based versioning feature

236 of 236 new or added lines in 24 files covered. (100.0%)

18343 of 23697 relevant lines covered (77.41%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.76
/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.client;
22

23
import static io.temporal.internal.WorkflowThreadMarker.enforceNonWorkflowThread;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Strings;
27
import com.google.common.collect.Iterators;
28
import com.google.common.reflect.TypeToken;
29
import com.uber.m3.tally.Scope;
30
import io.temporal.api.common.v1.WorkflowExecution;
31
import io.temporal.api.history.v1.History;
32
import io.temporal.api.history.v1.HistoryEvent;
33
import io.temporal.api.workflowservice.v1.GetWorkerBuildIdCompatibilityRequest;
34
import io.temporal.api.workflowservice.v1.GetWorkerBuildIdCompatibilityResponse;
35
import io.temporal.api.workflowservice.v1.UpdateWorkerBuildIdCompatibilityRequest;
36
import io.temporal.client.WorkflowInvocationHandler.InvocationType;
37
import io.temporal.common.WorkflowExecutionHistory;
38
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
39
import io.temporal.common.interceptors.WorkflowClientInterceptor;
40
import io.temporal.internal.WorkflowThreadMarker;
41
import io.temporal.internal.client.RootWorkflowClientInvoker;
42
import io.temporal.internal.client.WorkerFactoryRegistry;
43
import io.temporal.internal.client.WorkflowClientInternal;
44
import io.temporal.internal.client.external.GenericWorkflowClient;
45
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
46
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
47
import io.temporal.internal.sync.StubMarker;
48
import io.temporal.serviceclient.MetricsTag;
49
import io.temporal.serviceclient.WorkflowServiceStubs;
50
import io.temporal.worker.WorkerFactory;
51
import io.temporal.workflow.Functions;
52
import io.temporal.workflow.QueryMethod;
53
import io.temporal.workflow.SignalMethod;
54
import io.temporal.workflow.WorkflowMethod;
55
import java.lang.annotation.Annotation;
56
import java.lang.reflect.Method;
57
import java.lang.reflect.Proxy;
58
import java.util.*;
59
import java.util.concurrent.CompletableFuture;
60
import java.util.stream.Collectors;
61
import java.util.stream.Stream;
62
import java.util.stream.StreamSupport;
63
import javax.annotation.Nonnull;
64
import javax.annotation.Nullable;
65

66
final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClientInternal {
67

68
  private final GenericWorkflowClient genericClient;
69
  private final WorkflowClientOptions options;
70
  private final ManualActivityCompletionClientFactory manualActivityCompletionClientFactory;
71
  private final WorkflowClientCallsInterceptor workflowClientCallsInvoker;
72
  private final WorkflowServiceStubs workflowServiceStubs;
73
  private final Scope metricsScope;
74
  private final WorkflowClientInterceptor[] interceptors;
75
  private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry();
1✔
76

77
  /**
78
   * Creates client that connects to an instance of the Temporal Service. Cannot be used from within
79
   * workflow code.
80
   *
81
   * @param service client to the Temporal Service endpoint.
82
   * @param options Options (like {@link io.temporal.common.converter.DataConverter} override) for
83
   *     configuring client.
84
   */
85
  public static WorkflowClient newInstance(
86
      WorkflowServiceStubs service, WorkflowClientOptions options) {
87
    enforceNonWorkflowThread();
1✔
88
    return WorkflowThreadMarker.protectFromWorkflowThread(
1✔
89
        new WorkflowClientInternalImpl(service, options), WorkflowClient.class);
90
  }
91

92
  WorkflowClientInternalImpl(
93
      WorkflowServiceStubs workflowServiceStubs, WorkflowClientOptions options) {
1✔
94
    options = WorkflowClientOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
95
    this.options = options;
1✔
96
    this.workflowServiceStubs = workflowServiceStubs;
1✔
97
    this.metricsScope =
1✔
98
        workflowServiceStubs
99
            .getOptions()
1✔
100
            .getMetricsScope()
1✔
101
            .tagged(MetricsTag.defaultTags(options.getNamespace()));
1✔
102
    this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope);
1✔
103
    this.interceptors = options.getInterceptors();
1✔
104
    this.workflowClientCallsInvoker = initializeClientInvoker();
1✔
105
    this.manualActivityCompletionClientFactory =
1✔
106
        ManualActivityCompletionClientFactory.newFactory(
1✔
107
            workflowServiceStubs,
108
            options.getNamespace(),
1✔
109
            options.getIdentity(),
1✔
110
            options.getDataConverter());
1✔
111
  }
1✔
112

113
  private WorkflowClientCallsInterceptor initializeClientInvoker() {
114
    WorkflowClientCallsInterceptor workflowClientInvoker =
1✔
115
        new RootWorkflowClientInvoker(genericClient, options, workerFactoryRegistry);
116
    for (WorkflowClientInterceptor clientInterceptor : interceptors) {
1✔
117
      workflowClientInvoker =
1✔
118
          clientInterceptor.workflowClientCallsInterceptor(workflowClientInvoker);
1✔
119
    }
120
    return workflowClientInvoker;
1✔
121
  }
122

123
  @Override
124
  public WorkflowServiceStubs getWorkflowServiceStubs() {
125
    return workflowServiceStubs;
1✔
126
  }
127

128
  @Override
129
  public WorkflowClientOptions getOptions() {
130
    return options;
1✔
131
  }
132

133
  @Override
134
  @SuppressWarnings("unchecked")
135
  public <T> T newWorkflowStub(Class<T> workflowInterface, WorkflowOptions options) {
136
    checkAnnotation(workflowInterface, WorkflowMethod.class);
1✔
137
    WorkflowInvocationHandler invocationHandler =
1✔
138
        new WorkflowInvocationHandler(
139
            workflowInterface, this.getOptions(), workflowClientCallsInvoker, options);
1✔
140
    return (T)
1✔
141
        Proxy.newProxyInstance(
1✔
142
            workflowInterface.getClassLoader(),
1✔
143
            new Class<?>[] {workflowInterface, StubMarker.class},
144
            invocationHandler);
145
  }
146

147
  @SafeVarargs
148
  private static <T> void checkAnnotation(
149
      Class<T> workflowInterface, Class<? extends Annotation>... annotationClasses) {
150
    TypeToken<?>.TypeSet interfaces = TypeToken.of(workflowInterface).getTypes().interfaces();
1✔
151
    if (interfaces.isEmpty()) {
1✔
152
      throw new IllegalArgumentException("Workflow must implement at least one interface");
×
153
    }
154
    for (TypeToken<?> i : interfaces) {
1✔
155
      for (Method method : i.getRawType().getMethods()) {
1✔
156
        for (Class<? extends Annotation> annotationClass : annotationClasses) {
1✔
157
          Object workflowMethod = method.getAnnotation(annotationClass);
1✔
158
          if (workflowMethod != null) {
1✔
159
            return;
1✔
160
          }
161
        }
162
      }
163
    }
×
164
    throw new IllegalArgumentException(
×
165
        "Workflow interface "
166
            + workflowInterface.getName()
×
167
            + " doesn't have method annotated with any of "
168
            + Arrays.toString(annotationClasses));
×
169
  }
170

171
  @Override
172
  public <T> T newWorkflowStub(Class<T> workflowInterface, String workflowId) {
173
    return newWorkflowStub(workflowInterface, workflowId, Optional.empty());
1✔
174
  }
175

176
  @Override
177
  public <T> T newWorkflowStub(
178
      Class<T> workflowInterface, String workflowId, Optional<String> runId) {
179
    checkAnnotation(workflowInterface, WorkflowMethod.class, QueryMethod.class, SignalMethod.class);
1✔
180
    if (Strings.isNullOrEmpty(workflowId)) {
1✔
181
      throw new IllegalArgumentException("workflowId is null or empty");
×
182
    }
183
    WorkflowExecution execution =
184
        WorkflowExecution.newBuilder().setWorkflowId(workflowId).setRunId(runId.orElse("")).build();
1✔
185

186
    WorkflowInvocationHandler invocationHandler =
1✔
187
        new WorkflowInvocationHandler(
188
            workflowInterface, this.getOptions(), workflowClientCallsInvoker, execution);
1✔
189
    @SuppressWarnings("unchecked")
190
    T result =
1✔
191
        (T)
192
            Proxy.newProxyInstance(
1✔
193
                workflowInterface.getClassLoader(),
1✔
194
                new Class<?>[] {workflowInterface, StubMarker.class},
195
                invocationHandler);
196
    return result;
1✔
197
  }
198

199
  @Override
200
  public WorkflowStub newUntypedWorkflowStub(String workflowId) {
201
    return newUntypedWorkflowStub(workflowId, Optional.empty(), Optional.empty());
1✔
202
  }
203

204
  @Override
205
  @SuppressWarnings("deprecation")
206
  public WorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions workflowOptions) {
207
    WorkflowStub result =
1✔
208
        new WorkflowStubImpl(options, workflowClientCallsInvoker, workflowType, workflowOptions);
209
    for (WorkflowClientInterceptor i : interceptors) {
1✔
210
      result = i.newUntypedWorkflowStub(workflowType, workflowOptions, result);
1✔
211
    }
212
    return result;
1✔
213
  }
214

215
  @Override
216
  public WorkflowStub newUntypedWorkflowStub(
217
      String workflowId, Optional<String> runId, Optional<String> workflowType) {
218
    WorkflowExecution execution =
219
        WorkflowExecution.newBuilder().setWorkflowId(workflowId).setRunId(runId.orElse("")).build();
1✔
220
    return newUntypedWorkflowStub(execution, workflowType);
1✔
221
  }
222

223
  @Override
224
  @SuppressWarnings("deprecation")
225
  public WorkflowStub newUntypedWorkflowStub(
226
      WorkflowExecution execution, Optional<String> workflowType) {
227
    WorkflowStub result =
1✔
228
        new WorkflowStubImpl(options, workflowClientCallsInvoker, workflowType, execution);
229
    for (WorkflowClientInterceptor i : interceptors) {
1✔
230
      result = i.newUntypedWorkflowStub(execution, workflowType, result);
1✔
231
    }
232
    return result;
1✔
233
  }
234

235
  @Override
236
  public ActivityCompletionClient newActivityCompletionClient() {
237
    ActivityCompletionClient result =
1✔
238
        WorkflowThreadMarker.protectFromWorkflowThread(
1✔
239
            new ActivityCompletionClientImpl(
240
                manualActivityCompletionClientFactory, () -> {}, metricsScope, null),
1✔
241
            ActivityCompletionClient.class);
242
    for (WorkflowClientInterceptor i : interceptors) {
1✔
243
      result = i.newActivityCompletionClient(result);
1✔
244
    }
245
    return result;
1✔
246
  }
247

248
  @Override
249
  public BatchRequest newSignalWithStartRequest() {
250
    return new SignalWithStartBatchRequest();
1✔
251
  }
252

253
  @Override
254
  public WorkflowExecution signalWithStart(BatchRequest signalWithStartBatch) {
255
    return ((SignalWithStartBatchRequest) signalWithStartBatch).invoke();
1✔
256
  }
257

258
  @Override
259
  public Stream<WorkflowExecutionMetadata> listExecutions(@Nullable String query) {
260
    return listExecutions(query, null);
×
261
  }
262

263
  Stream<WorkflowExecutionMetadata> listExecutions(
264
      @Nullable String query, @Nullable Integer pageSize) {
265
    ListWorkflowExecutionIterator iterator =
×
266
        new ListWorkflowExecutionIterator(query, options.getNamespace(), pageSize, genericClient);
×
267
    iterator.init();
×
268
    Iterator<WorkflowExecutionMetadata> wrappedIterator =
×
269
        Iterators.transform(
×
270
            iterator, info -> new WorkflowExecutionMetadata(info, options.getDataConverter()));
×
271

272
    // IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
273
    // impossible
274
    //  TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
275
    // API
276
    // guarantees absence of duplicates
277
    final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;
×
278

279
    return StreamSupport.stream(
×
280
        Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false);
×
281
  }
282

283
  @Override
284
  public Stream<HistoryEvent> streamHistory(@Nonnull String workflowId) {
285
    return streamHistory(workflowId, null);
×
286
  }
287

288
  @Override
289
  public Stream<HistoryEvent> streamHistory(@Nonnull String workflowId, @Nullable String runId) {
290
    Preconditions.checkNotNull(workflowId, "workflowId is required");
1✔
291

292
    WorkflowExecution.Builder executionBuilder =
293
        WorkflowExecution.newBuilder().setWorkflowId(workflowId);
1✔
294
    if (runId != null) {
1✔
295
      executionBuilder.setRunId(runId);
1✔
296
    }
297
    WorkflowExecution execution = executionBuilder.build();
1✔
298

299
    return streamHistory(execution);
1✔
300
  }
301

302
  @Override
303
  public WorkflowExecutionHistory fetchHistory(@Nonnull String workflowId) {
304
    return fetchHistory(workflowId, null);
1✔
305
  }
306

307
  @Override
308
  public WorkflowExecutionHistory fetchHistory(@Nonnull String workflowId, @Nullable String runId) {
309
    Preconditions.checkNotNull(workflowId, "execution is required");
1✔
310

311
    return new WorkflowExecutionHistory(
1✔
312
        History.newBuilder()
1✔
313
            .addAllEvents(streamHistory(workflowId, runId).collect(Collectors.toList()))
1✔
314
            .build(),
1✔
315
        workflowId);
316
  }
317

318
  @Override
319
  public void updateWorkerBuildIdCompatability(
320
      @Nonnull String taskQueue, @Nonnull BuildIdOperation operation) {
321
    UpdateWorkerBuildIdCompatibilityRequest.Builder reqBuilder =
322
        UpdateWorkerBuildIdCompatibilityRequest.newBuilder()
×
323
            .setTaskQueue(taskQueue)
×
324
            .setNamespace(options.getNamespace());
×
325
    operation.augmentBuilder(reqBuilder);
×
326
    genericClient.updateWorkerBuildIdCompatability(reqBuilder.build());
×
327
  }
×
328

329
  @Override
330
  public WorkerBuildIdVersionSets getWorkerBuildIdCompatability(@Nonnull String taskQueue) {
331
    GetWorkerBuildIdCompatibilityRequest req =
332
        GetWorkerBuildIdCompatibilityRequest.newBuilder()
×
333
            .setTaskQueue(taskQueue)
×
334
            .setNamespace(options.getNamespace())
×
335
            .build();
×
336
    GetWorkerBuildIdCompatibilityResponse resp = genericClient.getWorkerBuildIdCompatability(req);
×
337
    return new WorkerBuildIdVersionSets(resp);
×
338
  }
339

340
  public static WorkflowExecution start(Functions.Proc workflow) {
341
    enforceNonWorkflowThread();
1✔
342
    WorkflowInvocationHandler.initAsyncInvocation(InvocationType.START);
1✔
343
    try {
344
      workflow.apply();
1✔
345
      return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowExecution.class);
1✔
346
    } finally {
347
      WorkflowInvocationHandler.closeAsyncInvocation();
1✔
348
    }
349
  }
350

351
  public static <A1> WorkflowExecution start(Functions.Proc1<A1> workflow, A1 arg1) {
352
    return start(() -> workflow.apply(arg1));
1✔
353
  }
354

355
  public static <A1, A2> WorkflowExecution start(
356
      Functions.Proc2<A1, A2> workflow, A1 arg1, A2 arg2) {
357
    return start(() -> workflow.apply(arg1, arg2));
1✔
358
  }
359

360
  public static <A1, A2, A3> WorkflowExecution start(
361
      Functions.Proc3<A1, A2, A3> workflow, A1 arg1, A2 arg2, A3 arg3) {
362
    return start(() -> workflow.apply(arg1, arg2, arg3));
1✔
363
  }
364

365
  public static <A1, A2, A3, A4> WorkflowExecution start(
366
      Functions.Proc4<A1, A2, A3, A4> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) {
367
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4));
1✔
368
  }
369

370
  public static <A1, A2, A3, A4, A5> WorkflowExecution start(
371
      Functions.Proc5<A1, A2, A3, A4, A5> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5) {
372
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5));
1✔
373
  }
374

375
  public static <A1, A2, A3, A4, A5, A6> WorkflowExecution start(
376
      Functions.Proc6<A1, A2, A3, A4, A5, A6> workflow,
377
      A1 arg1,
378
      A2 arg2,
379
      A3 arg3,
380
      A4 arg4,
381
      A5 arg5,
382
      A6 arg6) {
383
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
1✔
384
  }
385

386
  public static <R> WorkflowExecution start(Functions.Func<R> workflow) {
387
    return start((Functions.Proc) workflow::apply);
1✔
388
  }
389

390
  public static <A1, R> WorkflowExecution start(Functions.Func1<A1, R> workflow, A1 arg1) {
391
    return start(() -> workflow.apply(arg1));
1✔
392
  }
393

394
  public static <A1, A2, R> WorkflowExecution start(
395
      Functions.Func2<A1, A2, R> workflow, A1 arg1, A2 arg2) {
396
    return start(() -> workflow.apply(arg1, arg2));
1✔
397
  }
398

399
  public static <A1, A2, A3, R> WorkflowExecution start(
400
      Functions.Func3<A1, A2, A3, R> workflow, A1 arg1, A2 arg2, A3 arg3) {
401
    return start(() -> workflow.apply(arg1, arg2, arg3));
1✔
402
  }
403

404
  public static <A1, A2, A3, A4, R> WorkflowExecution start(
405
      Functions.Func4<A1, A2, A3, A4, R> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) {
406
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4));
1✔
407
  }
408

409
  public static <A1, A2, A3, A4, A5, R> WorkflowExecution start(
410
      Functions.Func5<A1, A2, A3, A4, A5, R> workflow,
411
      A1 arg1,
412
      A2 arg2,
413
      A3 arg3,
414
      A4 arg4,
415
      A5 arg5) {
416
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5));
1✔
417
  }
418

419
  public static <A1, A2, A3, A4, A5, A6, R> WorkflowExecution start(
420
      Functions.Func6<A1, A2, A3, A4, A5, A6, R> workflow,
421
      A1 arg1,
422
      A2 arg2,
423
      A3 arg3,
424
      A4 arg4,
425
      A5 arg5,
426
      A6 arg6) {
427
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
1✔
428
  }
429

430
  @SuppressWarnings("unchecked")
431
  public static CompletableFuture<Void> execute(Functions.Proc workflow) {
432
    enforceNonWorkflowThread();
1✔
433
    WorkflowInvocationHandler.initAsyncInvocation(InvocationType.EXECUTE);
1✔
434
    try {
435
      workflow.apply();
1✔
436
      return WorkflowInvocationHandler.getAsyncInvocationResult(CompletableFuture.class);
1✔
437
    } finally {
438
      WorkflowInvocationHandler.closeAsyncInvocation();
1✔
439
    }
440
  }
441

442
  public static <A1> CompletableFuture<Void> execute(Functions.Proc1<A1> workflow, A1 arg1) {
443
    return execute(() -> workflow.apply(arg1));
1✔
444
  }
445

446
  public static <A1, A2> CompletableFuture<Void> execute(
447
      Functions.Proc2<A1, A2> workflow, A1 arg1, A2 arg2) {
448
    return execute(() -> workflow.apply(arg1, arg2));
1✔
449
  }
450

451
  public static <A1, A2, A3> CompletableFuture<Void> execute(
452
      Functions.Proc3<A1, A2, A3> workflow, A1 arg1, A2 arg2, A3 arg3) {
453
    return execute(() -> workflow.apply(arg1, arg2, arg3));
1✔
454
  }
455

456
  public static <A1, A2, A3, A4> CompletableFuture<Void> execute(
457
      Functions.Proc4<A1, A2, A3, A4> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) {
458
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4));
1✔
459
  }
460

461
  public static <A1, A2, A3, A4, A5> CompletableFuture<Void> execute(
462
      Functions.Proc5<A1, A2, A3, A4, A5> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5) {
463
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5));
1✔
464
  }
465

466
  public static <A1, A2, A3, A4, A5, A6> CompletableFuture<Void> execute(
467
      Functions.Proc6<A1, A2, A3, A4, A5, A6> workflow,
468
      A1 arg1,
469
      A2 arg2,
470
      A3 arg3,
471
      A4 arg4,
472
      A5 arg5,
473
      A6 arg6) {
474
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
1✔
475
  }
476

477
  @SuppressWarnings("unchecked")
478
  public static <R> CompletableFuture<R> execute(Functions.Func<R> workflow) {
479
    return (CompletableFuture<R>) execute((Functions.Proc) workflow::apply);
1✔
480
  }
481

482
  public static <A1, R> CompletableFuture<R> execute(Functions.Func1<A1, R> workflow, A1 arg1) {
483
    return execute(() -> workflow.apply(arg1));
1✔
484
  }
485

486
  public static <A1, A2, R> CompletableFuture<R> execute(
487
      Functions.Func2<A1, A2, R> workflow, A1 arg1, A2 arg2) {
488
    return execute(() -> workflow.apply(arg1, arg2));
1✔
489
  }
490

491
  public static <A1, A2, A3, R> CompletableFuture<R> execute(
492
      Functions.Func3<A1, A2, A3, R> workflow, A1 arg1, A2 arg2, A3 arg3) {
493
    return execute(() -> workflow.apply(arg1, arg2, arg3));
1✔
494
  }
495

496
  public static <A1, A2, A3, A4, R> CompletableFuture<R> execute(
497
      Functions.Func4<A1, A2, A3, A4, R> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) {
498
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4));
1✔
499
  }
500

501
  public static <A1, A2, A3, A4, A5, R> CompletableFuture<R> execute(
502
      Functions.Func5<A1, A2, A3, A4, A5, R> workflow,
503
      A1 arg1,
504
      A2 arg2,
505
      A3 arg3,
506
      A4 arg4,
507
      A5 arg5) {
508
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5));
1✔
509
  }
510

511
  public static <A1, A2, A3, A4, A5, A6, R> CompletableFuture<R> execute(
512
      Functions.Func6<A1, A2, A3, A4, A5, A6, R> workflow,
513
      A1 arg1,
514
      A2 arg2,
515
      A3 arg3,
516
      A4 arg4,
517
      A5 arg5,
518
      A6 arg6) {
519
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
1✔
520
  }
521

522
  Stream<HistoryEvent> streamHistory(WorkflowExecution execution) {
523
    Preconditions.checkNotNull(execution, "execution is required");
1✔
524

525
    GetWorkflowExecutionHistoryIterator iterator =
1✔
526
        new GetWorkflowExecutionHistoryIterator(
527
            options.getNamespace(), execution, null, genericClient);
1✔
528
    iterator.init();
1✔
529

530
    // IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
531
    // impossible
532
    final int CHARACTERISTICS =
1✔
533
        Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.DISTINCT | Spliterator.IMMUTABLE;
534

535
    return StreamSupport.stream(
1✔
536
        Spliterators.spliteratorUnknownSize(iterator, CHARACTERISTICS), false);
1✔
537
  }
538

539
  @Override
540
  public Object getInternal() {
541
    return this;
1✔
542
  }
543

544
  @Override
545
  public void registerWorkerFactory(WorkerFactory workerFactory) {
546
    workerFactoryRegistry.register(workerFactory);
1✔
547
  }
1✔
548

549
  @Override
550
  public void deregisterWorkerFactory(WorkerFactory workerFactory) {
551
    workerFactoryRegistry.deregister(workerFactory);
1✔
552
  }
1✔
553
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc