• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.49
/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.client;
22

23
import static io.temporal.internal.WorkflowThreadMarker.enforceNonWorkflowThread;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Strings;
27
import com.google.common.collect.Iterators;
28
import com.google.common.reflect.TypeToken;
29
import com.uber.m3.tally.Scope;
30
import io.temporal.api.common.v1.WorkflowExecution;
31
import io.temporal.api.enums.v1.TaskReachability;
32
import io.temporal.api.history.v1.History;
33
import io.temporal.api.history.v1.HistoryEvent;
34
import io.temporal.api.workflowservice.v1.*;
35
import io.temporal.client.WorkflowInvocationHandler.InvocationType;
36
import io.temporal.common.WorkflowExecutionHistory;
37
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
38
import io.temporal.common.interceptors.WorkflowClientInterceptor;
39
import io.temporal.internal.WorkflowThreadMarker;
40
import io.temporal.internal.client.NexusStartWorkflowRequest;
41
import io.temporal.internal.client.RootWorkflowClientInvoker;
42
import io.temporal.internal.client.WorkerFactoryRegistry;
43
import io.temporal.internal.client.WorkflowClientInternal;
44
import io.temporal.internal.client.external.GenericWorkflowClient;
45
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
46
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
47
import io.temporal.internal.sync.StubMarker;
48
import io.temporal.serviceclient.MetricsTag;
49
import io.temporal.serviceclient.WorkflowServiceStubs;
50
import io.temporal.worker.WorkerFactory;
51
import io.temporal.workflow.*;
52
import java.lang.annotation.Annotation;
53
import java.lang.reflect.Method;
54
import java.lang.reflect.Proxy;
55
import java.util.*;
56
import java.util.concurrent.CompletableFuture;
57
import java.util.stream.Collectors;
58
import java.util.stream.Stream;
59
import java.util.stream.StreamSupport;
60
import javax.annotation.Nonnull;
61
import javax.annotation.Nullable;
62

63
final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClientInternal {
64

65
  private final GenericWorkflowClient genericClient;
66
  private final WorkflowClientOptions options;
67
  private final ManualActivityCompletionClientFactory manualActivityCompletionClientFactory;
68
  private final WorkflowClientCallsInterceptor workflowClientCallsInvoker;
69
  private final WorkflowServiceStubs workflowServiceStubs;
70
  private final Scope metricsScope;
71
  private final WorkflowClientInterceptor[] interceptors;
72
  private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry();
1✔
73

74
  /**
75
   * Creates client that connects to an instance of the Temporal Service. Cannot be used from within
76
   * workflow code.
77
   *
78
   * @param service client to the Temporal Service endpoint.
79
   * @param options Options (like {@link io.temporal.common.converter.DataConverter} override) for
80
   *     configuring client.
81
   */
82
  public static WorkflowClient newInstance(
83
      WorkflowServiceStubs service, WorkflowClientOptions options) {
84
    enforceNonWorkflowThread();
1✔
85
    return WorkflowThreadMarker.protectFromWorkflowThread(
1✔
86
        new WorkflowClientInternalImpl(service, options), WorkflowClient.class);
87
  }
88

89
  WorkflowClientInternalImpl(
90
      WorkflowServiceStubs workflowServiceStubs, WorkflowClientOptions options) {
1✔
91
    options = WorkflowClientOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
92
    this.options = options;
1✔
93
    this.workflowServiceStubs = workflowServiceStubs;
1✔
94
    this.metricsScope =
1✔
95
        workflowServiceStubs
96
            .getOptions()
1✔
97
            .getMetricsScope()
1✔
98
            .tagged(MetricsTag.defaultTags(options.getNamespace()));
1✔
99
    this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope);
1✔
100
    this.interceptors = options.getInterceptors();
1✔
101
    this.workflowClientCallsInvoker = initializeClientInvoker();
1✔
102
    this.manualActivityCompletionClientFactory =
1✔
103
        ManualActivityCompletionClientFactory.newFactory(
1✔
104
            workflowServiceStubs,
105
            options.getNamespace(),
1✔
106
            options.getIdentity(),
1✔
107
            options.getDataConverter());
1✔
108
  }
1✔
109

110
  private WorkflowClientCallsInterceptor initializeClientInvoker() {
111
    WorkflowClientCallsInterceptor workflowClientInvoker =
1✔
112
        new RootWorkflowClientInvoker(genericClient, options, workerFactoryRegistry);
113
    for (WorkflowClientInterceptor clientInterceptor : interceptors) {
1✔
114
      workflowClientInvoker =
1✔
115
          clientInterceptor.workflowClientCallsInterceptor(workflowClientInvoker);
1✔
116
    }
117
    return workflowClientInvoker;
1✔
118
  }
119

120
  @Override
121
  public WorkflowServiceStubs getWorkflowServiceStubs() {
122
    return workflowServiceStubs;
1✔
123
  }
124

125
  @Override
126
  public WorkflowClientOptions getOptions() {
127
    return options;
1✔
128
  }
129

130
  @Override
131
  @SuppressWarnings("unchecked")
132
  public <T> T newWorkflowStub(Class<T> workflowInterface, WorkflowOptions options) {
133
    checkAnnotation(workflowInterface, WorkflowMethod.class);
1✔
134
    WorkflowInvocationHandler invocationHandler =
1✔
135
        new WorkflowInvocationHandler(
136
            workflowInterface, this.getOptions(), workflowClientCallsInvoker, options);
1✔
137
    return (T)
1✔
138
        Proxy.newProxyInstance(
1✔
139
            workflowInterface.getClassLoader(),
1✔
140
            new Class<?>[] {workflowInterface, StubMarker.class},
141
            invocationHandler);
142
  }
143

144
  @SafeVarargs
145
  private static <T> void checkAnnotation(
146
      Class<T> workflowInterface, Class<? extends Annotation>... annotationClasses) {
147
    TypeToken<?>.TypeSet interfaces = TypeToken.of(workflowInterface).getTypes().interfaces();
1✔
148
    if (interfaces.isEmpty()) {
1✔
149
      throw new IllegalArgumentException("Workflow must implement at least one interface");
×
150
    }
151
    for (TypeToken<?> i : interfaces) {
1✔
152
      for (Method method : i.getRawType().getMethods()) {
1✔
153
        for (Class<? extends Annotation> annotationClass : annotationClasses) {
1✔
154
          Object workflowMethod = method.getAnnotation(annotationClass);
1✔
155
          if (workflowMethod != null) {
1✔
156
            return;
1✔
157
          }
158
        }
159
      }
160
    }
×
161
    throw new IllegalArgumentException(
×
162
        "Workflow interface "
163
            + workflowInterface.getName()
×
164
            + " doesn't have method annotated with any of "
165
            + Arrays.toString(annotationClasses));
×
166
  }
167

168
  @Override
169
  public <T> T newWorkflowStub(Class<T> workflowInterface, String workflowId) {
170
    return newWorkflowStub(workflowInterface, workflowId, Optional.empty());
1✔
171
  }
172

173
  @Override
174
  public <T> T newWorkflowStub(
175
      Class<T> workflowInterface, String workflowId, Optional<String> runId) {
176
    checkAnnotation(
1✔
177
        workflowInterface,
178
        WorkflowMethod.class,
179
        QueryMethod.class,
180
        SignalMethod.class,
181
        UpdateMethod.class);
182
    if (Strings.isNullOrEmpty(workflowId)) {
1✔
183
      throw new IllegalArgumentException("workflowId is null or empty");
×
184
    }
185
    WorkflowExecution execution =
186
        WorkflowExecution.newBuilder().setWorkflowId(workflowId).setRunId(runId.orElse("")).build();
1✔
187

188
    WorkflowInvocationHandler invocationHandler =
1✔
189
        new WorkflowInvocationHandler(
190
            workflowInterface, this.getOptions(), workflowClientCallsInvoker, execution);
1✔
191
    @SuppressWarnings("unchecked")
192
    T result =
1✔
193
        (T)
194
            Proxy.newProxyInstance(
1✔
195
                workflowInterface.getClassLoader(),
1✔
196
                new Class<?>[] {workflowInterface, StubMarker.class},
197
                invocationHandler);
198
    return result;
1✔
199
  }
200

201
  @Override
202
  public WorkflowStub newUntypedWorkflowStub(String workflowId) {
203
    return newUntypedWorkflowStub(workflowId, Optional.empty(), Optional.empty());
1✔
204
  }
205

206
  @Override
207
  @SuppressWarnings("deprecation")
208
  public WorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions workflowOptions) {
209
    WorkflowStub result =
1✔
210
        new WorkflowStubImpl(options, workflowClientCallsInvoker, workflowType, workflowOptions);
211
    for (WorkflowClientInterceptor i : interceptors) {
1✔
212
      result = i.newUntypedWorkflowStub(workflowType, workflowOptions, result);
1✔
213
    }
214
    return result;
1✔
215
  }
216

217
  @Override
218
  public WorkflowStub newUntypedWorkflowStub(
219
      String workflowId, Optional<String> runId, Optional<String> workflowType) {
220
    WorkflowExecution execution =
221
        WorkflowExecution.newBuilder().setWorkflowId(workflowId).setRunId(runId.orElse("")).build();
1✔
222
    return newUntypedWorkflowStub(execution, workflowType);
1✔
223
  }
224

225
  @Override
226
  @SuppressWarnings("deprecation")
227
  public WorkflowStub newUntypedWorkflowStub(
228
      WorkflowExecution execution, Optional<String> workflowType) {
229
    WorkflowStub result =
1✔
230
        new WorkflowStubImpl(options, workflowClientCallsInvoker, workflowType, execution);
231
    for (WorkflowClientInterceptor i : interceptors) {
1✔
232
      result = i.newUntypedWorkflowStub(execution, workflowType, result);
1✔
233
    }
234
    return result;
1✔
235
  }
236

237
  @Override
238
  public ActivityCompletionClient newActivityCompletionClient() {
239
    ActivityCompletionClient result =
1✔
240
        WorkflowThreadMarker.protectFromWorkflowThread(
1✔
241
            new ActivityCompletionClientImpl(
242
                manualActivityCompletionClientFactory, () -> {}, metricsScope, null),
1✔
243
            ActivityCompletionClient.class);
244
    for (WorkflowClientInterceptor i : interceptors) {
1✔
245
      result = i.newActivityCompletionClient(result);
1✔
246
    }
247
    return result;
1✔
248
  }
249

250
  @Override
251
  public BatchRequest newSignalWithStartRequest() {
252
    return new SignalWithStartBatchRequest();
1✔
253
  }
254

255
  @Override
256
  public WorkflowExecution signalWithStart(BatchRequest signalWithStartBatch) {
257
    return ((SignalWithStartBatchRequest) signalWithStartBatch).invoke();
1✔
258
  }
259

260
  @Override
261
  public Stream<WorkflowExecutionMetadata> listExecutions(@Nullable String query) {
262
    return listExecutions(query, null);
×
263
  }
264

265
  Stream<WorkflowExecutionMetadata> listExecutions(
266
      @Nullable String query, @Nullable Integer pageSize) {
267
    ListWorkflowExecutionIterator iterator =
×
268
        new ListWorkflowExecutionIterator(query, options.getNamespace(), pageSize, genericClient);
×
269
    iterator.init();
×
270
    Iterator<WorkflowExecutionMetadata> wrappedIterator =
×
271
        Iterators.transform(
×
272
            iterator, info -> new WorkflowExecutionMetadata(info, options.getDataConverter()));
×
273

274
    // IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
275
    // impossible
276
    //  TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
277
    // API
278
    // guarantees absence of duplicates
279
    final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;
×
280

281
    return StreamSupport.stream(
×
282
        Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false);
×
283
  }
284

285
  @Override
286
  public Stream<HistoryEvent> streamHistory(@Nonnull String workflowId) {
287
    return streamHistory(workflowId, null);
×
288
  }
289

290
  @Override
291
  public Stream<HistoryEvent> streamHistory(@Nonnull String workflowId, @Nullable String runId) {
292
    Preconditions.checkNotNull(workflowId, "workflowId is required");
1✔
293

294
    WorkflowExecution.Builder executionBuilder =
295
        WorkflowExecution.newBuilder().setWorkflowId(workflowId);
1✔
296
    if (runId != null) {
1✔
297
      executionBuilder.setRunId(runId);
1✔
298
    }
299
    WorkflowExecution execution = executionBuilder.build();
1✔
300

301
    return streamHistory(execution);
1✔
302
  }
303

304
  @Override
305
  public WorkflowExecutionHistory fetchHistory(@Nonnull String workflowId) {
306
    return fetchHistory(workflowId, null);
1✔
307
  }
308

309
  @Override
310
  public WorkflowExecutionHistory fetchHistory(@Nonnull String workflowId, @Nullable String runId) {
311
    Preconditions.checkNotNull(workflowId, "execution is required");
1✔
312

313
    return new WorkflowExecutionHistory(
1✔
314
        History.newBuilder()
1✔
315
            .addAllEvents(streamHistory(workflowId, runId).collect(Collectors.toList()))
1✔
316
            .build(),
1✔
317
        workflowId);
318
  }
319

320
  @Override
321
  public void updateWorkerBuildIdCompatability(
322
      @Nonnull String taskQueue, @Nonnull BuildIdOperation operation) {
323
    UpdateWorkerBuildIdCompatibilityRequest.Builder reqBuilder =
324
        UpdateWorkerBuildIdCompatibilityRequest.newBuilder()
×
325
            .setTaskQueue(taskQueue)
×
326
            .setNamespace(options.getNamespace());
×
327
    operation.augmentBuilder(reqBuilder);
×
328
    genericClient.updateWorkerBuildIdCompatability(reqBuilder.build());
×
329
  }
×
330

331
  @Override
332
  public WorkerBuildIdVersionSets getWorkerBuildIdCompatability(@Nonnull String taskQueue) {
333
    GetWorkerBuildIdCompatibilityRequest req =
334
        GetWorkerBuildIdCompatibilityRequest.newBuilder()
×
335
            .setTaskQueue(taskQueue)
×
336
            .setNamespace(options.getNamespace())
×
337
            .build();
×
338
    GetWorkerBuildIdCompatibilityResponse resp = genericClient.getWorkerBuildIdCompatability(req);
×
339
    return new WorkerBuildIdVersionSets(resp);
×
340
  }
341

342
  @Override
343
  public WorkerTaskReachability getWorkerTaskReachability(
344
      @Nonnull Iterable<String> buildIds,
345
      @Nonnull Iterable<String> taskQueues,
346
      TaskReachability reachability) {
347
    GetWorkerTaskReachabilityRequest req =
348
        GetWorkerTaskReachabilityRequest.newBuilder()
×
349
            .setNamespace(options.getNamespace())
×
350
            .addAllBuildIds(buildIds)
×
351
            .addAllTaskQueues(taskQueues)
×
352
            .setReachability(reachability)
×
353
            .build();
×
354
    GetWorkerTaskReachabilityResponse resp = genericClient.GetWorkerTaskReachability(req);
×
355
    return new WorkerTaskReachability(resp);
×
356
  }
357

358
  public static WorkflowExecution start(Functions.Proc workflow) {
359
    enforceNonWorkflowThread();
1✔
360
    WorkflowInvocationHandler.initAsyncInvocation(InvocationType.START);
1✔
361
    try {
362
      workflow.apply();
1✔
363
      return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowExecution.class);
1✔
364
    } finally {
365
      WorkflowInvocationHandler.closeAsyncInvocation();
1✔
366
    }
367
  }
368

369
  public static <A1> WorkflowExecution start(Functions.Proc1<A1> workflow, A1 arg1) {
370
    return start(() -> workflow.apply(arg1));
1✔
371
  }
372

373
  public static <A1, A2> WorkflowExecution start(
374
      Functions.Proc2<A1, A2> workflow, A1 arg1, A2 arg2) {
375
    return start(() -> workflow.apply(arg1, arg2));
1✔
376
  }
377

378
  public static <A1, A2, A3> WorkflowExecution start(
379
      Functions.Proc3<A1, A2, A3> workflow, A1 arg1, A2 arg2, A3 arg3) {
380
    return start(() -> workflow.apply(arg1, arg2, arg3));
1✔
381
  }
382

383
  public static <A1, A2, A3, A4> WorkflowExecution start(
384
      Functions.Proc4<A1, A2, A3, A4> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) {
385
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4));
1✔
386
  }
387

388
  public static <A1, A2, A3, A4, A5> WorkflowExecution start(
389
      Functions.Proc5<A1, A2, A3, A4, A5> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5) {
390
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5));
1✔
391
  }
392

393
  public static <A1, A2, A3, A4, A5, A6> WorkflowExecution start(
394
      Functions.Proc6<A1, A2, A3, A4, A5, A6> workflow,
395
      A1 arg1,
396
      A2 arg2,
397
      A3 arg3,
398
      A4 arg4,
399
      A5 arg5,
400
      A6 arg6) {
401
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
1✔
402
  }
403

404
  public static <R> WorkflowExecution start(Functions.Func<R> workflow) {
405
    return start((Functions.Proc) workflow::apply);
1✔
406
  }
407

408
  public static <A1, R> WorkflowExecution start(Functions.Func1<A1, R> workflow, A1 arg1) {
409
    return start(() -> workflow.apply(arg1));
1✔
410
  }
411

412
  public static <A1, A2, R> WorkflowExecution start(
413
      Functions.Func2<A1, A2, R> workflow, A1 arg1, A2 arg2) {
414
    return start(() -> workflow.apply(arg1, arg2));
1✔
415
  }
416

417
  public static <A1, A2, A3, R> WorkflowExecution start(
418
      Functions.Func3<A1, A2, A3, R> workflow, A1 arg1, A2 arg2, A3 arg3) {
419
    return start(() -> workflow.apply(arg1, arg2, arg3));
1✔
420
  }
421

422
  public static <A1, A2, A3, A4, R> WorkflowExecution start(
423
      Functions.Func4<A1, A2, A3, A4, R> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) {
424
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4));
1✔
425
  }
426

427
  public static <A1, A2, A3, A4, A5, R> WorkflowExecution start(
428
      Functions.Func5<A1, A2, A3, A4, A5, R> workflow,
429
      A1 arg1,
430
      A2 arg2,
431
      A3 arg3,
432
      A4 arg4,
433
      A5 arg5) {
434
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5));
1✔
435
  }
436

437
  public static <A1, A2, A3, A4, A5, A6, R> WorkflowExecution start(
438
      Functions.Func6<A1, A2, A3, A4, A5, A6, R> workflow,
439
      A1 arg1,
440
      A2 arg2,
441
      A3 arg3,
442
      A4 arg4,
443
      A5 arg5,
444
      A6 arg6) {
445
    return start(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
1✔
446
  }
447

448
  @SuppressWarnings("unchecked")
449
  public static CompletableFuture<Void> execute(Functions.Proc workflow) {
450
    enforceNonWorkflowThread();
1✔
451
    WorkflowInvocationHandler.initAsyncInvocation(InvocationType.EXECUTE);
1✔
452
    try {
453
      workflow.apply();
1✔
454
      return WorkflowInvocationHandler.getAsyncInvocationResult(CompletableFuture.class);
1✔
455
    } finally {
456
      WorkflowInvocationHandler.closeAsyncInvocation();
1✔
457
    }
458
  }
459

460
  public static <A1> CompletableFuture<Void> execute(Functions.Proc1<A1> workflow, A1 arg1) {
461
    return execute(() -> workflow.apply(arg1));
1✔
462
  }
463

464
  public static <A1, A2> CompletableFuture<Void> execute(
465
      Functions.Proc2<A1, A2> workflow, A1 arg1, A2 arg2) {
466
    return execute(() -> workflow.apply(arg1, arg2));
1✔
467
  }
468

469
  public static <A1, A2, A3> CompletableFuture<Void> execute(
470
      Functions.Proc3<A1, A2, A3> workflow, A1 arg1, A2 arg2, A3 arg3) {
471
    return execute(() -> workflow.apply(arg1, arg2, arg3));
1✔
472
  }
473

474
  public static <A1, A2, A3, A4> CompletableFuture<Void> execute(
475
      Functions.Proc4<A1, A2, A3, A4> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) {
476
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4));
1✔
477
  }
478

479
  public static <A1, A2, A3, A4, A5> CompletableFuture<Void> execute(
480
      Functions.Proc5<A1, A2, A3, A4, A5> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5) {
481
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5));
1✔
482
  }
483

484
  public static <A1, A2, A3, A4, A5, A6> CompletableFuture<Void> execute(
485
      Functions.Proc6<A1, A2, A3, A4, A5, A6> workflow,
486
      A1 arg1,
487
      A2 arg2,
488
      A3 arg3,
489
      A4 arg4,
490
      A5 arg5,
491
      A6 arg6) {
492
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
1✔
493
  }
494

495
  @SuppressWarnings("unchecked")
496
  public static <R> CompletableFuture<R> execute(Functions.Func<R> workflow) {
497
    return (CompletableFuture<R>) execute((Functions.Proc) workflow::apply);
1✔
498
  }
499

500
  public static <A1, R> CompletableFuture<R> execute(Functions.Func1<A1, R> workflow, A1 arg1) {
501
    return execute(() -> workflow.apply(arg1));
1✔
502
  }
503

504
  public static <A1, A2, R> CompletableFuture<R> execute(
505
      Functions.Func2<A1, A2, R> workflow, A1 arg1, A2 arg2) {
506
    return execute(() -> workflow.apply(arg1, arg2));
1✔
507
  }
508

509
  public static <A1, A2, A3, R> CompletableFuture<R> execute(
510
      Functions.Func3<A1, A2, A3, R> workflow, A1 arg1, A2 arg2, A3 arg3) {
511
    return execute(() -> workflow.apply(arg1, arg2, arg3));
1✔
512
  }
513

514
  public static <A1, A2, A3, A4, R> CompletableFuture<R> execute(
515
      Functions.Func4<A1, A2, A3, A4, R> workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) {
516
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4));
1✔
517
  }
518

519
  public static <A1, A2, A3, A4, A5, R> CompletableFuture<R> execute(
520
      Functions.Func5<A1, A2, A3, A4, A5, R> workflow,
521
      A1 arg1,
522
      A2 arg2,
523
      A3 arg3,
524
      A4 arg4,
525
      A5 arg5) {
526
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5));
1✔
527
  }
528

529
  public static <A1, A2, A3, A4, A5, A6, R> CompletableFuture<R> execute(
530
      Functions.Func6<A1, A2, A3, A4, A5, A6, R> workflow,
531
      A1 arg1,
532
      A2 arg2,
533
      A3 arg3,
534
      A4 arg4,
535
      A5 arg5,
536
      A6 arg6) {
537
    return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
1✔
538
  }
539

540
  Stream<HistoryEvent> streamHistory(WorkflowExecution execution) {
541
    Preconditions.checkNotNull(execution, "execution is required");
1✔
542

543
    GetWorkflowExecutionHistoryIterator iterator =
1✔
544
        new GetWorkflowExecutionHistoryIterator(
545
            options.getNamespace(), execution, null, genericClient);
1✔
546
    iterator.init();
1✔
547

548
    // IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
549
    // impossible
550
    final int CHARACTERISTICS =
1✔
551
        Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.DISTINCT | Spliterator.IMMUTABLE;
552

553
    return StreamSupport.stream(
1✔
554
        Spliterators.spliteratorUnknownSize(iterator, CHARACTERISTICS), false);
1✔
555
  }
556

557
  @Override
558
  public Object getInternal() {
559
    return this;
1✔
560
  }
561

562
  @Override
563
  public void registerWorkerFactory(WorkerFactory workerFactory) {
564
    workerFactoryRegistry.register(workerFactory);
1✔
565
  }
1✔
566

567
  @Override
568
  public void deregisterWorkerFactory(WorkerFactory workerFactory) {
569
    workerFactoryRegistry.deregister(workerFactory);
1✔
570
  }
1✔
571

572
  @Override
573
  public WorkflowExecution startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow) {
574
    enforceNonWorkflowThread();
1✔
575
    WorkflowInvocationHandler.initAsyncInvocation(InvocationType.START_NEXUS, request);
1✔
576
    try {
577
      workflow.apply();
1✔
578
      return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowExecution.class);
1✔
579
    } finally {
580
      WorkflowInvocationHandler.closeAsyncInvocation();
1✔
581
    }
582
  }
583
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc