• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.24
/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.client;
22

23
import static io.temporal.internal.common.InternalUtils.createNexusBoundStub;
24

25
import com.google.common.base.Defaults;
26
import io.temporal.api.common.v1.WorkflowExecution;
27
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
28
import io.temporal.common.CronSchedule;
29
import io.temporal.common.MethodRetry;
30
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
31
import io.temporal.common.interceptors.WorkflowClientInterceptor;
32
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
33
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
34
import io.temporal.common.metadata.WorkflowMethodType;
35
import io.temporal.internal.client.NexusStartWorkflowRequest;
36
import io.temporal.internal.sync.StubMarker;
37
import io.temporal.workflow.QueryMethod;
38
import io.temporal.workflow.SignalMethod;
39
import io.temporal.workflow.UpdateMethod;
40
import io.temporal.workflow.WorkflowMethod;
41
import java.lang.reflect.InvocationHandler;
42
import java.lang.reflect.Method;
43
import java.util.*;
44

45
/**
46
 * Dynamic implementation of a strongly typed workflow interface that can be used to start, signal
47
 * and query workflows from external processes.
48
 */
49
class WorkflowInvocationHandler implements InvocationHandler {
50

51
  public enum InvocationType {
1✔
52
    SYNC,
1✔
53
    START,
1✔
54
    EXECUTE,
1✔
55
    SIGNAL_WITH_START,
1✔
56
    START_NEXUS,
1✔
57
    UPDATE_WITH_START
1✔
58
  }
59

60
  interface SpecificInvocationHandler {
61
    InvocationType getInvocationType();
62

63
    void invoke(
64
        POJOWorkflowInterfaceMetadata workflowMetadata,
65
        WorkflowStub untyped,
66
        Method method,
67
        Object[] args)
68
        throws Throwable;
69

70
    <R> R getResult(Class<R> resultClass);
71
  }
72

73
  private static final ThreadLocal<SpecificInvocationHandler> invocationContext =
1✔
74
      new ThreadLocal<>();
75

76
  /** Must call {@link #closeAsyncInvocation()} if this one was called. */
77
  static void initAsyncInvocation(InvocationType type) {
78
    initAsyncInvocation(type, null);
1✔
79
  }
1✔
80

81
  /** Must call {@link #closeAsyncInvocation()} if this one was called. */
82
  static <T> void initAsyncInvocation(InvocationType type, T value) {
83
    if (invocationContext.get() != null) {
1✔
84
      throw new IllegalStateException("already in start invocation");
×
85
    }
86
    if (type == InvocationType.START) {
1✔
87
      invocationContext.set(new StartWorkflowInvocationHandler());
1✔
88
    } else if (type == InvocationType.EXECUTE) {
1✔
89
      invocationContext.set(new ExecuteWorkflowInvocationHandler());
1✔
90
    } else if (type == InvocationType.SIGNAL_WITH_START) {
1✔
91
      SignalWithStartBatchRequest batch = (SignalWithStartBatchRequest) value;
1✔
92
      invocationContext.set(new SignalWithStartWorkflowInvocationHandler(batch));
1✔
93
    } else if (type == InvocationType.START_NEXUS) {
1✔
94
      NexusStartWorkflowRequest request = (NexusStartWorkflowRequest) value;
1✔
95
      invocationContext.set(new StartNexusOperationInvocationHandler(request));
1✔
96
    } else if (type == InvocationType.UPDATE_WITH_START) {
1✔
97
      UpdateWithStartWorkflowOperation operation = (UpdateWithStartWorkflowOperation) value;
1✔
98
      invocationContext.set(new UpdateWithStartInvocationHandler(operation));
1✔
99
    } else {
1✔
100
      throw new IllegalArgumentException("Unexpected InvocationType: " + type);
×
101
    }
102
  }
1✔
103

104
  static <R> R getAsyncInvocationResult(Class<R> resultClass) {
105
    SpecificInvocationHandler invocation = invocationContext.get();
1✔
106
    if (invocation == null) {
1✔
107
      throw new IllegalStateException("initAsyncInvocation wasn't called");
×
108
    }
109
    return invocation.getResult(resultClass);
1✔
110
  }
111

112
  /** Closes async invocation created through {@link #initAsyncInvocation(InvocationType)} */
113
  static void closeAsyncInvocation() {
114
    invocationContext.remove();
1✔
115
  }
1✔
116

117
  private final WorkflowStub untyped;
118
  private final POJOWorkflowInterfaceMetadata workflowMetadata;
119

120
  @SuppressWarnings("deprecation")
121
  WorkflowInvocationHandler(
122
      Class<?> workflowInterface,
123
      WorkflowClientOptions clientOptions,
124
      WorkflowClientCallsInterceptor workflowClientCallsInvoker,
125
      WorkflowExecution execution) {
1✔
126
    workflowMetadata = POJOWorkflowInterfaceMetadata.newInstance(workflowInterface, false);
1✔
127
    Optional<String> workflowType = workflowMetadata.getWorkflowType();
1✔
128
    WorkflowStub stub =
1✔
129
        new WorkflowStubImpl(clientOptions, workflowClientCallsInvoker, workflowType, execution);
130
    for (WorkflowClientInterceptor i : clientOptions.getInterceptors()) {
1✔
131
      stub = i.newUntypedWorkflowStub(execution, workflowType, stub);
1✔
132
    }
133
    this.untyped = stub;
1✔
134
  }
1✔
135

136
  @SuppressWarnings("deprecation")
137
  WorkflowInvocationHandler(
138
      Class<?> workflowInterface,
139
      WorkflowClientOptions clientOptions,
140
      WorkflowClientCallsInterceptor workflowClientCallsInvoker,
141
      WorkflowOptions options) {
1✔
142
    Objects.requireNonNull(options, "options");
1✔
143
    workflowMetadata = POJOWorkflowInterfaceMetadata.newInstance(workflowInterface);
1✔
144
    Optional<POJOWorkflowMethodMetadata> workflowMethodMetadata =
1✔
145
        workflowMetadata.getWorkflowMethod();
1✔
146
    if (!workflowMethodMetadata.isPresent()) {
1✔
147
      throw new IllegalArgumentException(
×
148
          "Method annotated with @WorkflowMethod is not found in " + workflowInterface);
149
    }
150
    Method workflowMethod = workflowMethodMetadata.get().getWorkflowMethod();
1✔
151
    MethodRetry methodRetry = workflowMethod.getAnnotation(MethodRetry.class);
1✔
152
    CronSchedule cronSchedule = workflowMethod.getAnnotation(CronSchedule.class);
1✔
153
    WorkflowOptions mergedOptions = WorkflowOptions.merge(methodRetry, cronSchedule, options);
1✔
154
    String workflowType = workflowMethodMetadata.get().getName();
1✔
155
    WorkflowStub stub =
1✔
156
        new WorkflowStubImpl(
157
            clientOptions, workflowClientCallsInvoker, workflowType, mergedOptions);
158
    for (WorkflowClientInterceptor i : clientOptions.getInterceptors()) {
1✔
159
      stub = i.newUntypedWorkflowStub(workflowType, mergedOptions, stub);
1✔
160
    }
161
    this.untyped = stub;
1✔
162
  }
1✔
163

164
  @Override
165
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
166
    try {
167
      if (method.equals(Object.class.getMethod("toString"))) {
1✔
168
        // TODO: workflow info
169
        return "WorkflowInvocationHandler";
×
170
      }
171
    } catch (NoSuchMethodException e) {
×
172
      throw new Error("unexpected", e);
×
173
    }
1✔
174
    // Implement StubMarker
175
    if (method.getName().equals(StubMarker.GET_UNTYPED_STUB_METHOD)) {
1✔
176
      return untyped;
1✔
177
    }
178
    if (!method.getDeclaringClass().isInterface()) {
1✔
179
      throw new IllegalArgumentException(
×
180
          "Interface type is expected: " + method.getDeclaringClass());
×
181
    }
182
    SpecificInvocationHandler handler = invocationContext.get();
1✔
183
    if (handler == null) {
1✔
184
      handler = new SyncWorkflowInvocationHandler();
1✔
185
    }
186
    handler.invoke(this.workflowMetadata, untyped, method, args);
1✔
187
    if (handler.getInvocationType() == InvocationType.SYNC) {
1✔
188
      return handler.getResult(method.getReturnType());
1✔
189
    }
190
    return Defaults.defaultValue(method.getReturnType());
1✔
191
  }
192

193
  private static void startWorkflow(WorkflowStub untyped, Object[] args) {
194
    Optional<WorkflowOptions> options = untyped.getOptions();
1✔
195
    if (untyped.getExecution() == null
1✔
196
        || (options.isPresent()
1✔
197
            && options.get().getWorkflowIdReusePolicy()
1✔
198
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)) {
199
      try {
200
        untyped.start(args);
1✔
201
      } catch (WorkflowExecutionAlreadyStarted e) {
1✔
202
        // We do allow duplicated calls if policy is not AllowDuplicate. Semantic is to wait for
203
        // result.
204
        if (options.isPresent()
1✔
205
            && options.get().getWorkflowIdReusePolicy()
1✔
206
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE) {
207
          throw e;
×
208
        }
209
      }
1✔
210
    }
211
  }
1✔
212

213
  static void checkAnnotations(
214
      Method method,
215
      WorkflowMethod workflowMethod,
216
      QueryMethod queryMethod,
217
      SignalMethod signalMethod) {
218
    int count =
219
        (workflowMethod == null ? 0 : 1)
×
220
            + (queryMethod == null ? 0 : 1)
×
221
            + (signalMethod == null ? 0 : 1);
×
222
    if (count > 1) {
×
223
      throw new IllegalArgumentException(
×
224
          method
225
              + " must contain at most one annotation "
226
              + "from @WorkflowMethod, @QueryMethod or @SignalMethod");
227
    }
228
  }
×
229

230
  private static class StartWorkflowInvocationHandler implements SpecificInvocationHandler {
231

232
    private Object result;
233

234
    @Override
235
    public InvocationType getInvocationType() {
236
      return InvocationType.START;
1✔
237
    }
238

239
    @Override
240
    public void invoke(
241
        POJOWorkflowInterfaceMetadata workflowMetadata,
242
        WorkflowStub untyped,
243
        Method method,
244
        Object[] args) {
245
      WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
1✔
246
      if (workflowMethod == null) {
1✔
247
        throw new IllegalArgumentException(
×
248
            "WorkflowClient.start can be called only on a method annotated with @WorkflowMethod");
249
      }
250
      result = untyped.start(args);
1✔
251
    }
1✔
252

253
    @Override
254
    @SuppressWarnings("unchecked")
255
    public <R> R getResult(Class<R> resultClass) {
256
      return (R) result;
1✔
257
    }
258
  }
259

260
  private static class SyncWorkflowInvocationHandler implements SpecificInvocationHandler {
261

262
    private Object result;
263

264
    @Override
265
    public InvocationType getInvocationType() {
266
      return InvocationType.SYNC;
1✔
267
    }
268

269
    @Override
270
    public void invoke(
271
        POJOWorkflowInterfaceMetadata workflowMetadata,
272
        WorkflowStub untyped,
273
        Method method,
274
        Object[] args) {
275
      POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method);
1✔
276
      WorkflowMethodType type = methodMetadata.getType();
1✔
277
      if (type == WorkflowMethodType.WORKFLOW) {
1✔
278
        result = startWorkflow(untyped, method, args);
1✔
279
      } else if (type == WorkflowMethodType.QUERY) {
1✔
280
        result = queryWorkflow(methodMetadata, untyped, method, args);
1✔
281
      } else if (type == WorkflowMethodType.SIGNAL) {
1✔
282
        signalWorkflow(methodMetadata, untyped, method, args);
1✔
283
        result = null;
1✔
284
      } else if (type == WorkflowMethodType.UPDATE) {
1✔
285
        result = updateWorkflow(methodMetadata, untyped, method, args);
1✔
286
      } else {
287
        throw new IllegalArgumentException(
×
288
            method + " is not annotated with @WorkflowMethod, @QueryMethod, @UpdateMethod");
289
      }
290
    }
1✔
291

292
    @Override
293
    @SuppressWarnings("unchecked")
294
    public <R> R getResult(Class<R> resultClass) {
295
      return (R) result;
1✔
296
    }
297

298
    private void signalWorkflow(
299
        POJOWorkflowMethodMetadata methodMetadata,
300
        WorkflowStub untyped,
301
        Method method,
302
        Object[] args) {
303
      if (method.getReturnType() != Void.TYPE) {
1✔
304
        throw new IllegalArgumentException("Signal method must have void return type: " + method);
×
305
      }
306
      String signalName = methodMetadata.getName();
1✔
307
      untyped.signal(signalName, args);
1✔
308
    }
1✔
309

310
    private Object queryWorkflow(
311
        POJOWorkflowMethodMetadata methodMetadata,
312
        WorkflowStub untyped,
313
        Method method,
314
        Object[] args) {
315
      if (method.getReturnType() == Void.TYPE) {
1✔
316
        throw new IllegalArgumentException("Query method cannot have void return type: " + method);
×
317
      }
318
      String queryType = methodMetadata.getName();
1✔
319
      return untyped.query(queryType, method.getReturnType(), method.getGenericReturnType(), args);
1✔
320
    }
321

322
    private Object updateWorkflow(
323
        POJOWorkflowMethodMetadata methodMetadata,
324
        WorkflowStub untyped,
325
        Method method,
326
        Object[] args) {
327
      String updateType = methodMetadata.getName();
1✔
328
      return untyped.update(updateType, method.getReturnType(), args);
1✔
329
    }
330

331
    @SuppressWarnings("FutureReturnValueIgnored")
332
    private Object startWorkflow(WorkflowStub untyped, Method method, Object[] args) {
333
      WorkflowInvocationHandler.startWorkflow(untyped, args);
1✔
334
      return untyped.getResult(method.getReturnType(), method.getGenericReturnType());
1✔
335
    }
336
  }
337

338
  private static class ExecuteWorkflowInvocationHandler implements SpecificInvocationHandler {
339

340
    private Object result;
341

342
    @Override
343
    public InvocationType getInvocationType() {
344
      return InvocationType.EXECUTE;
1✔
345
    }
346

347
    @Override
348
    public void invoke(
349
        POJOWorkflowInterfaceMetadata workflowMetadata,
350
        WorkflowStub untyped,
351
        Method method,
352
        Object[] args) {
353
      WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
1✔
354
      if (workflowMethod == null) {
1✔
355
        throw new IllegalArgumentException(
×
356
            "WorkflowClient.execute can be called only on a method annotated with @WorkflowMethod");
357
      }
358
      WorkflowInvocationHandler.startWorkflow(untyped, args);
1✔
359
      result = untyped.getResultAsync(method.getReturnType(), method.getGenericReturnType());
1✔
360
    }
1✔
361

362
    @Override
363
    @SuppressWarnings("unchecked")
364
    public <R> R getResult(Class<R> resultClass) {
365
      return (R) result;
1✔
366
    }
367
  }
368

369
  private static class SignalWithStartWorkflowInvocationHandler
370
      implements SpecificInvocationHandler {
371

372
    private final SignalWithStartBatchRequest batch;
373

374
    public SignalWithStartWorkflowInvocationHandler(SignalWithStartBatchRequest batch) {
1✔
375
      this.batch = batch;
1✔
376
    }
1✔
377

378
    @Override
379
    public InvocationType getInvocationType() {
380
      return InvocationType.SIGNAL_WITH_START;
1✔
381
    }
382

383
    @Override
384
    public void invoke(
385
        POJOWorkflowInterfaceMetadata workflowMetadata,
386
        WorkflowStub untyped,
387
        Method method,
388
        Object[] args) {
389
      POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method);
1✔
390
      switch (methodMetadata.getType()) {
1✔
391
        case QUERY:
392
          throw new IllegalArgumentException(
×
393
              "SignalWithStart batch doesn't accept methods annotated with @QueryMethod");
394
        case WORKFLOW:
395
          batch.start(untyped, args);
1✔
396
          break;
1✔
397
        case SIGNAL:
398
          batch.signal(untyped, methodMetadata.getName(), args);
1✔
399
          break;
400
      }
401
    }
1✔
402

403
    @Override
404
    public <R> R getResult(Class<R> resultClass) {
405
      throw new IllegalStateException("No result is expected");
×
406
    }
407
  }
408

409
  private static class StartNexusOperationInvocationHandler implements SpecificInvocationHandler {
410
    private final NexusStartWorkflowRequest request;
411
    private Object result;
412

413
    public StartNexusOperationInvocationHandler(NexusStartWorkflowRequest request) {
1✔
414
      this.request = request;
1✔
415
    }
1✔
416

417
    @Override
418
    public InvocationType getInvocationType() {
419
      return InvocationType.START_NEXUS;
1✔
420
    }
421

422
    @Override
423
    public void invoke(
424
        POJOWorkflowInterfaceMetadata workflowMetadata,
425
        WorkflowStub untyped,
426
        Method method,
427
        Object[] args) {
428
      WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
1✔
429
      if (workflowMethod == null) {
1✔
430
        throw new IllegalArgumentException(
×
431
            "Only on a method annotated with @WorkflowMethod can be used to start a Nexus operation.");
432
      }
433

434
      result = createNexusBoundStub(untyped, request).start(args);
1✔
435
    }
1✔
436

437
    @Override
438
    @SuppressWarnings("unchecked")
439
    public <R> R getResult(Class<R> resultClass) {
440
      return (R) result;
1✔
441
    }
442
  }
443

444
  private static class UpdateWithStartInvocationHandler implements SpecificInvocationHandler {
445

446
    enum State {
1✔
447
      NOT_STARTED,
1✔
448
      START_RECEIVED,
1✔
449
      UPDATE_RECEIVED,
1✔
450
    }
451

452
    private final UpdateWithStartWorkflowOperation operation;
453

454
    private State state = State.NOT_STARTED;
1✔
455

456
    public UpdateWithStartInvocationHandler(UpdateWithStartWorkflowOperation operation) {
1✔
457
      this.operation = operation;
1✔
458
    }
1✔
459

460
    @Override
461
    public InvocationType getInvocationType() {
462
      return InvocationType.UPDATE_WITH_START;
1✔
463
    }
464

465
    @Override
466
    public void invoke(
467
        POJOWorkflowInterfaceMetadata workflowMetadata,
468
        WorkflowStub untyped,
469
        Method method,
470
        Object[] args) {
471

472
      POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method);
1✔
473

474
      if (state == State.NOT_STARTED) {
1✔
475
        UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class);
1✔
476
        if (updateMethod == null) {
1✔
477
          throw new IllegalArgumentException(
1✔
478
              "Method '" + method.getName() + "' is not an UpdateMethod");
1✔
479
        }
480
        this.operation.prepareUpdate(
1✔
481
            untyped,
482
            methodMetadata.getName(),
1✔
483
            method.getReturnType(),
1✔
484
            method.getGenericReturnType(),
1✔
485
            args);
486
        state = State.START_RECEIVED;
1✔
487
      } else if (state == State.START_RECEIVED) {
1✔
488
        WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
1✔
489
        if (workflowMethod == null) {
1✔
490
          throw new IllegalArgumentException(
1✔
491
              "Method '" + method.getName() + "' is not a WorkflowMethod");
1✔
492
        }
493
        this.operation.prepareStart(untyped);
1✔
494
        state = State.UPDATE_RECEIVED;
1✔
495
      } else {
1✔
496
        throw new IllegalArgumentException(
×
497
            "UpdateWithStartInvocationHandler called too many times");
498
      }
499
    }
1✔
500

501
    @Override
502
    public <R> R getResult(Class<R> resultClass) {
503
      throw new IllegalStateException("No result is expected");
×
504
    }
505
  }
506
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc