• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1764

pending completion
1764

push

buildkite

GitHub
Exposed startedEventAttribute and DataConverter (#799)

* exposing startedEventAttributes through DecisionContext in WorkflowInfo for using in ContinueAsNew

* addition of workflowEventAttributes in dummy implementation

* Addition of dataConverter

8 of 8 new or added lines in 3 files covered. (100.0%)

11112 of 18404 relevant lines covered (60.38%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.47
/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.uber.cadence.SearchAttributes;
21
import com.uber.cadence.WorkflowExecution;
22
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
23
import com.uber.cadence.WorkflowType;
24
import com.uber.cadence.context.ContextPropagator;
25
import com.uber.cadence.converter.DataConverter;
26
import com.uber.cadence.converter.JsonDataConverter;
27
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
28
import com.uber.cadence.internal.context.ContextThreadLocal;
29
import com.uber.cadence.internal.metrics.NoopScope;
30
import com.uber.cadence.internal.replay.ContinueAsNewWorkflowExecutionParameters;
31
import com.uber.cadence.internal.replay.DeciderCache;
32
import com.uber.cadence.internal.replay.DecisionContext;
33
import com.uber.cadence.internal.replay.ExecuteActivityParameters;
34
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
35
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
36
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
37
import com.uber.cadence.workflow.Functions.Func;
38
import com.uber.cadence.workflow.Functions.Func1;
39
import com.uber.cadence.workflow.Promise;
40
import com.uber.m3.tally.Scope;
41
import java.time.Duration;
42
import java.util.ArrayDeque;
43
import java.util.ArrayList;
44
import java.util.Collections;
45
import java.util.Deque;
46
import java.util.HashMap;
47
import java.util.HashSet;
48
import java.util.Iterator;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Optional;
52
import java.util.Random;
53
import java.util.Set;
54
import java.util.UUID;
55
import java.util.concurrent.ExecutionException;
56
import java.util.concurrent.ExecutorService;
57
import java.util.concurrent.Future;
58
import java.util.concurrent.SynchronousQueue;
59
import java.util.concurrent.ThreadFactory;
60
import java.util.concurrent.ThreadPoolExecutor;
61
import java.util.concurrent.TimeUnit;
62
import java.util.concurrent.locks.Lock;
63
import java.util.concurrent.locks.ReentrantLock;
64
import java.util.function.BiConsumer;
65
import java.util.function.Consumer;
66
import java.util.function.Supplier;
67
import org.slf4j.Logger;
68
import org.slf4j.LoggerFactory;
69

70
/** Throws Error in case of any unexpected condition. It is to fail a decision, not a workflow. */
71
class DeterministicRunnerImpl implements DeterministicRunner {
72

73
  private static class NamedRunnable {
74
    private final String name;
75
    private final Runnable runnable;
76

77
    private NamedRunnable(String name, Runnable runnable) {
1✔
78
      this.name = name;
1✔
79
      this.runnable = runnable;
1✔
80
    }
1✔
81
  }
82

83
  private static final Logger log = LoggerFactory.getLogger(DeterministicRunnerImpl.class);
1✔
84
  static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root";
85
  private static final ThreadLocal<WorkflowThread> currentThreadThreadLocal = new ThreadLocal<>();
1✔
86

87
  private final Lock lock = new ReentrantLock();
1✔
88
  private final ExecutorService threadPool;
89
  private final SyncDecisionContext decisionContext;
90
  private final Deque<WorkflowThread> threads = new ArrayDeque<>(); // protected by lock
1✔
91
  // Values from RunnerLocalInternal
92
  private final Map<RunnerLocalInternal<?>, Object> runnerLocalMap = new HashMap<>();
1✔
93

94
  private final List<WorkflowThread> threadsToAdd = Collections.synchronizedList(new ArrayList<>());
1✔
95
  private final List<NamedRunnable> toExecuteInWorkflowThread = new ArrayList<>();
1✔
96
  private final Supplier<Long> clock;
97
  private DeciderCache cache;
98
  private boolean inRunUntilAllBlocked;
99
  private boolean closeRequested;
100
  private boolean closed;
101

102
  static WorkflowThread currentThreadInternal() {
103
    WorkflowThread result = currentThreadThreadLocal.get();
1✔
104
    if (result == null) {
1✔
105
      throw new Error("Called from non workflow or workflow callback thread");
×
106
    }
107
    return result;
1✔
108
  }
109

110
  static void setCurrentThreadInternal(WorkflowThread coroutine) {
111
    currentThreadThreadLocal.set(coroutine);
1✔
112
  }
1✔
113

114
  /**
115
   * Time at which any thread that runs under sync can make progress. For example when {@link
116
   * com.uber.cadence.workflow.Workflow#sleep(long)} expires. 0 means no blocked threads.
117
   */
118
  private long nextWakeUpTime;
119
  /**
120
   * Used to check for failedPromises that contain an error, but never where accessed. It is to
121
   * avoid failure swallowing by failedPromises which is very hard to troubleshoot.
122
   */
123
  private Set<Promise> failedPromises = new HashSet<>();
1✔
124

125
  private boolean exitRequested;
126
  private Object exitValue;
127
  private WorkflowThread rootWorkflowThread;
128
  private final CancellationScopeImpl runnerCancellationScope;
129

130
  DeterministicRunnerImpl(Runnable root) {
131
    this(System::currentTimeMillis, root);
1✔
132
  }
1✔
133

134
  DeterministicRunnerImpl(Supplier<Long> clock, Runnable root) {
135
    this(getDefaultThreadPool(), newDummySyncDecisionContext(), clock, root, null);
1✔
136
  }
1✔
137

138
  private static ThreadPoolExecutor getDefaultThreadPool() {
139
    ThreadPoolExecutor result =
1✔
140
        new ThreadPoolExecutor(0, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
141
    result.setThreadFactory(
1✔
142
        new ThreadFactory() {
1✔
143
          @Override
144
          public Thread newThread(Runnable r) {
145
            return new Thread(r, "deterministic runner thread");
1✔
146
          }
147
        });
148
    return result;
1✔
149
  }
150

151
  DeterministicRunnerImpl(
152
      ExecutorService threadPool,
153
      SyncDecisionContext decisionContext,
154
      Supplier<Long> clock,
155
      Runnable root) {
156
    this(threadPool, decisionContext, clock, root, null);
1✔
157
  }
1✔
158

159
  DeterministicRunnerImpl(
160
      ExecutorService threadPool,
161
      SyncDecisionContext decisionContext,
162
      Supplier<Long> clock,
163
      Runnable root,
164
      DeciderCache cache) {
1✔
165
    this.threadPool = threadPool;
1✔
166
    this.decisionContext =
1✔
167
        decisionContext != null ? decisionContext : newDummySyncDecisionContext();
1✔
168
    this.clock = clock;
1✔
169
    this.cache = cache;
1✔
170
    runnerCancellationScope = new CancellationScopeImpl(true, null, null);
1✔
171

172
    // TODO: workflow instance specific thread name
173
    rootWorkflowThread =
1✔
174
        new WorkflowThreadImpl(
175
            true,
176
            threadPool,
177
            this,
178
            WORKFLOW_ROOT_THREAD_NAME,
179
            false,
180
            runnerCancellationScope,
181
            root,
182
            cache,
183
            getContextPropagators(),
1✔
184
            getPropagatedContexts());
1✔
185
    threads.addLast(rootWorkflowThread);
1✔
186
    rootWorkflowThread.start();
1✔
187
  }
1✔
188

189
  private static SyncDecisionContext newDummySyncDecisionContext() {
190
    return new SyncDecisionContext(
1✔
191
        new DummyDecisionContext(),
192
        JsonDataConverter.getInstance(),
1✔
193
        null,
194
        (next) -> next,
1✔
195
        null,
196
        null);
197
  }
198

199
  SyncDecisionContext getDecisionContext() {
200
    return decisionContext;
1✔
201
  }
202

203
  @Override
204
  public void runUntilAllBlocked() throws Throwable {
205
    lock.lock();
1✔
206
    try {
207
      checkClosed();
1✔
208

209
      inRunUntilAllBlocked = true;
1✔
210
      Throwable unhandledException = null;
1✔
211
      // Keep repeating until at least one of the threads makes progress.
212
      boolean progress;
213
      outerLoop:
214
      do {
215
        threadsToAdd.clear();
1✔
216

217
        if (!toExecuteInWorkflowThread.isEmpty()) {
1✔
218
          List<WorkflowThread> callbackThreads = new ArrayList<>(toExecuteInWorkflowThread.size());
1✔
219
          for (NamedRunnable nr : toExecuteInWorkflowThread) {
1✔
220
            WorkflowThread thread =
1✔
221
                new WorkflowThreadImpl(
222
                    false,
223
                    threadPool,
224
                    this,
225
                    nr.name,
1✔
226
                    false,
227
                    runnerCancellationScope,
228
                    nr.runnable,
1✔
229
                    cache,
230
                    getContextPropagators(),
1✔
231
                    getPropagatedContexts());
1✔
232
            callbackThreads.add(thread);
1✔
233
          }
1✔
234

235
          // It is important to prepend threads as there are callbacks
236
          // like signals that have to run before any other threads.
237
          // Otherwise signal might be never processed if it was received
238
          // after workflow decided to close.
239
          // Adding the callbacks in the same order as they appear in history.
240

241
          for (int i = callbackThreads.size() - 1; i >= 0; i--) {
1✔
242
            threads.addFirst(callbackThreads.get(i));
1✔
243
          }
244
        }
245

246
        toExecuteInWorkflowThread.clear();
1✔
247
        progress = false;
1✔
248
        Iterator<WorkflowThread> ci = threads.iterator();
1✔
249
        nextWakeUpTime = Long.MAX_VALUE;
1✔
250
        while (ci.hasNext()) {
1✔
251
          WorkflowThread c = ci.next();
1✔
252
          progress = c.runUntilBlocked() || progress;
1✔
253
          if (exitRequested) {
1✔
254
            close();
1✔
255
            break outerLoop;
1✔
256
          }
257
          if (c.isDone()) {
1✔
258
            ci.remove();
1✔
259
            if (c.getUnhandledException() != null) {
1✔
260
              unhandledException = c.getUnhandledException();
1✔
261
              break;
1✔
262
            }
263
          } else {
264
            long t = c.getBlockedUntil();
1✔
265
            if (t > currentTimeMillis() && t < nextWakeUpTime) {
1✔
266
              nextWakeUpTime = t;
1✔
267
            }
268
          }
269
        }
1✔
270
        if (unhandledException != null) {
1✔
271
          close();
1✔
272
          throw unhandledException;
1✔
273
        }
274
        for (WorkflowThread c : threadsToAdd) {
1✔
275
          threads.addLast(c);
1✔
276
        }
1✔
277
      } while (progress && !threads.isEmpty());
1✔
278

279
      if (nextWakeUpTime < currentTimeMillis() || nextWakeUpTime == Long.MAX_VALUE) {
1✔
280
        nextWakeUpTime = 0;
1✔
281
      }
282

283
    } finally {
284
      inRunUntilAllBlocked = false;
1✔
285
      // Close was requested while running
286
      if (closeRequested) {
1✔
287
        close();
1✔
288
      }
289
      lock.unlock();
1✔
290
    }
291
  }
1✔
292

293
  @Override
294
  public boolean isDone() {
295
    lock.lock();
1✔
296
    try {
297
      return closed || threads.isEmpty();
1✔
298
    } finally {
299
      lock.unlock();
1✔
300
    }
301
  }
302

303
  @Override
304
  @SuppressWarnings("unchecked")
305
  public Object getExitValue() {
306
    lock.lock();
1✔
307
    try {
308
      if (!closed) {
1✔
309
        throw new Error("not done");
×
310
      }
311
    } finally {
312
      lock.unlock();
1✔
313
    }
314
    return exitValue;
1✔
315
  }
316

317
  @Override
318
  public void cancel(String reason) {
319
    executeInWorkflowThread("cancel workflow callback", () -> rootWorkflowThread.cancel(reason));
1✔
320
  }
1✔
321

322
  @Override
323
  public void close() {
324
    List<Future<?>> threadFutures = new ArrayList<>();
1✔
325
    lock.lock();
1✔
326
    if (closed) {
1✔
327
      lock.unlock();
1✔
328
      return;
1✔
329
    }
330
    // Do not close while runUntilAllBlocked executes.
331
    // closeRequested tells it to call close() at the end.
332
    closeRequested = true;
1✔
333
    if (inRunUntilAllBlocked) {
1✔
334
      lock.unlock();
1✔
335
      return;
1✔
336
    }
337
    try {
338
      for (WorkflowThread c : threadsToAdd) {
1✔
339
        threads.addLast(c);
1✔
340
      }
1✔
341
      threadsToAdd.clear();
1✔
342

343
      for (WorkflowThread c : threads) {
1✔
344
        threadFutures.add(c.stopNow());
1✔
345
      }
1✔
346
      threads.clear();
1✔
347

348
      // We cannot use an iterator to unregister failed Promises since f.get()
349
      // will remove the promise directly from failedPromises. This causes an
350
      // ConcurrentModificationException
351
      // For this reason we will loop over a copy of failedPromises.
352
      Set<Promise> failedPromisesLoop = new HashSet<>(failedPromises);
1✔
353
      for (Promise f : failedPromisesLoop) {
1✔
354
        if (!f.isCompleted()) {
1✔
355
          throw new Error("expected failed");
×
356
        }
357
        try {
358
          f.get();
×
359
          throw new Error("unreachable");
×
360
        } catch (RuntimeException e) {
1✔
361
          log.warn(
1✔
362
              "Promise that was completedExceptionally was never accessed. "
363
                  + "The ignored exception:",
364
              CheckedExceptionWrapper.unwrap(e));
1✔
365
        }
366
      }
1✔
367
    } finally {
368
      closed = true;
1✔
369
      lock.unlock();
1✔
370
    }
371

372
    // Context is destroyed in c.StopNow(). Wait on all tasks outside the lock since
373
    // these tasks use the same lock to execute.
374
    for (Future<?> future : threadFutures) {
1✔
375
      try {
376
        future.get();
1✔
377
      } catch (InterruptedException e) {
×
378
        throw new Error("Unexpected interrupt", e);
×
379
      } catch (ExecutionException e) {
×
380
        throw new Error("Unexpected failure stopping coroutine", e);
×
381
      }
1✔
382
    }
1✔
383
  }
1✔
384

385
  @Override
386
  public String stackTrace() {
387
    StringBuilder result = new StringBuilder();
1✔
388
    lock.lock();
1✔
389
    try {
390
      for (WorkflowThread coroutine : threads) {
1✔
391
        if (result.length() > 0) {
1✔
392
          result.append("\n");
1✔
393
        }
394
        coroutine.addStackTrace(result);
1✔
395
      }
1✔
396
    } finally {
397
      lock.unlock();
1✔
398
    }
399
    return result.toString();
1✔
400
  }
401

402
  private void checkClosed() {
403
    if (closed) {
1✔
404
      throw new Error("closed");
×
405
    }
406
  }
1✔
407

408
  @Override
409
  public long currentTimeMillis() {
410
    return clock.get();
1✔
411
  }
412

413
  @Override
414
  public long getNextWakeUpTime() {
415
    lock.lock();
1✔
416
    try {
417
      checkClosed();
1✔
418
      if (decisionContext != null) {
1✔
419
        long nextFireTime = decisionContext.getNextFireTime();
1✔
420
        if (nextWakeUpTime == 0) {
1✔
421
          return nextFireTime;
1✔
422
        }
423
        if (nextFireTime == 0) {
1✔
424
          return nextWakeUpTime;
1✔
425
        }
426
        return Math.min(nextWakeUpTime, nextFireTime);
×
427
      }
428
      return nextWakeUpTime;
×
429
    } finally {
430
      lock.unlock();
1✔
431
    }
432
  }
433

434
  WorkflowThread newThread(Runnable runnable, boolean detached, String name) {
435
    checkWorkflowThreadOnly();
1✔
436
    checkClosed();
1✔
437
    WorkflowThread result =
1✔
438
        new WorkflowThreadImpl(
439
            false,
440
            threadPool,
441
            this,
442
            name,
443
            detached,
444
            CancellationScopeImpl.current(),
1✔
445
            runnable,
446
            cache,
447
            getContextPropagators(),
1✔
448
            getPropagatedContexts());
1✔
449
    threadsToAdd.add(result); // This is synchronized collection.
1✔
450
    return result;
1✔
451
  }
452

453
  /**
454
   * Executes before any other threads next time runUntilBlockedCalled. Must never be called from
455
   * any workflow threads.
456
   */
457
  @Override
458
  public void executeInWorkflowThread(String name, Runnable runnable) {
459
    lock.lock();
1✔
460
    try {
461
      checkClosed();
1✔
462
      toExecuteInWorkflowThread.add(new NamedRunnable(name, runnable));
1✔
463
    } finally {
464
      lock.unlock();
1✔
465
    }
466
  }
1✔
467

468
  Lock getLock() {
469
    return lock;
1✔
470
  }
471

472
  /** Register a promise that had failed but wasn't accessed yet. */
473
  void registerFailedPromise(Promise promise) {
474
    failedPromises.add(promise);
1✔
475
  }
1✔
476

477
  /** Forget a failed promise as it was accessed. */
478
  void forgetFailedPromise(Promise promise) {
479
    failedPromises.remove(promise);
1✔
480
  }
1✔
481

482
  <R> void exit(R value) {
483
    checkClosed();
1✔
484
    checkWorkflowThreadOnly();
1✔
485
    this.exitValue = value;
1✔
486
    this.exitRequested = true;
1✔
487
  }
1✔
488

489
  private void checkWorkflowThreadOnly() {
490
    if (!inRunUntilAllBlocked) {
1✔
491
      throw new Error("called from non workflow thread");
×
492
    }
493
  }
1✔
494

495
  @SuppressWarnings("unchecked")
496
  <T> Optional<T> getRunnerLocal(RunnerLocalInternal<T> key) {
497
    if (!runnerLocalMap.containsKey(key)) {
1✔
498
      return Optional.empty();
1✔
499
    }
500
    return Optional.of((T) runnerLocalMap.get(key));
1✔
501
  }
502

503
  <T> void setRunnerLocal(RunnerLocalInternal<T> key, T value) {
504
    runnerLocalMap.put(key, value);
1✔
505
  }
1✔
506

507
  /**
508
   * If we're executing as part of a workflow, get the current thread's context. Otherwise get the
509
   * context info from the DecisionContext
510
   */
511
  private Map<String, Object> getPropagatedContexts() {
512
    if (currentThreadThreadLocal.get() != null) {
1✔
513
      return ContextThreadLocal.getCurrentContextForPropagation();
1✔
514
    } else {
515
      return decisionContext.getContext().getPropagatedContexts();
1✔
516
    }
517
  }
518

519
  private List<ContextPropagator> getContextPropagators() {
520
    if (currentThreadThreadLocal.get() != null) {
1✔
521
      return ContextThreadLocal.getContextPropagators();
1✔
522
    } else {
523
      return decisionContext.getContext().getContextPropagators();
1✔
524
    }
525
  }
526

527
  private static final class DummyDecisionContext implements DecisionContext {
528

529
    @Override
530
    public WorkflowExecution getWorkflowExecution() {
531
      throw new UnsupportedOperationException("not implemented");
×
532
    }
533

534
    @Override
535
    public WorkflowExecution getParentWorkflowExecution() {
536
      throw new UnsupportedOperationException("not implemented");
×
537
    }
538

539
    @Override
540
    public WorkflowType getWorkflowType() {
541
      return new WorkflowType().setName("dummy-workflow");
1✔
542
    }
543

544
    @Override
545
    public boolean isCancelRequested() {
546
      throw new UnsupportedOperationException("not implemented");
×
547
    }
548

549
    @Override
550
    public ContinueAsNewWorkflowExecutionParameters getContinueAsNewOnCompletion() {
551
      throw new UnsupportedOperationException("not implemented");
×
552
    }
553

554
    @Override
555
    public void setContinueAsNewOnCompletion(
556
        ContinueAsNewWorkflowExecutionParameters continueParameters) {
557
      throw new UnsupportedOperationException("not implemented");
×
558
    }
559

560
    @Override
561
    public int getExecutionStartToCloseTimeoutSeconds() {
562
      throw new UnsupportedOperationException("not implemented");
×
563
    }
564

565
    @Override
566
    public String getTaskList() {
567
      return "dummy-task-list";
1✔
568
    }
569

570
    @Override
571
    public String getDomain() {
572
      return "dummy-domain";
1✔
573
    }
574

575
    @Override
576
    public String getWorkflowId() {
577
      return "dummy-workflow-id";
1✔
578
    }
579

580
    @Override
581
    public String getRunId() {
582
      return "dummy-run-id";
1✔
583
    }
584

585
    @Override
586
    public Duration getExecutionStartToCloseTimeout() {
587
      throw new UnsupportedOperationException("not implemented");
×
588
    }
589

590
    @Override
591
    public Duration getDecisionTaskTimeout() {
592
      throw new UnsupportedOperationException("not implemented");
×
593
    }
594

595
    @Override
596
    public SearchAttributes getSearchAttributes() {
597
      throw new UnsupportedOperationException("not implemented");
×
598
    }
599

600
    @Override
601
    public DataConverter getDataConverter() {
602
      return null;
×
603
    }
604

605
    @Override
606
    public Map<String, Object> getPropagatedContexts() {
607
      return null;
1✔
608
    }
609

610
    @Override
611
    public List<ContextPropagator> getContextPropagators() {
612
      return null;
1✔
613
    }
614

615
    @Override
616
    public WorkflowExecutionStartedEventAttributes getWorkflowExecutionStartedEventAttributes() {
617
      return null;
×
618
    }
619

620
    @Override
621
    public Consumer<Exception> scheduleActivityTask(
622
        ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
623
      throw new UnsupportedOperationException("not implemented");
×
624
    }
625

626
    @Override
627
    public Consumer<Exception> scheduleLocalActivityTask(
628
        ExecuteLocalActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
629
      throw new UnsupportedOperationException("not implemented");
×
630
    }
631

632
    @Override
633
    public Consumer<Exception> startChildWorkflow(
634
        StartChildWorkflowExecutionParameters parameters,
635
        Consumer<WorkflowExecution> executionCallback,
636
        BiConsumer<byte[], Exception> callback) {
637
      throw new UnsupportedOperationException("not implemented");
×
638
    }
639

640
    @Override
641
    public boolean isServerSideChildWorkflowRetry() {
642
      throw new UnsupportedOperationException("not implemented");
×
643
    }
644

645
    @Override
646
    public boolean isServerSideActivityRetry() {
647
      throw new UnsupportedOperationException("not implemented");
×
648
    }
649

650
    @Override
651
    public Consumer<Exception> signalWorkflowExecution(
652
        SignalExternalWorkflowParameters signalParameters, BiConsumer<Void, Exception> callback) {
653
      throw new UnsupportedOperationException("not implemented");
×
654
    }
655

656
    @Override
657
    public Promise<Void> requestCancelWorkflowExecution(WorkflowExecution execution) {
658
      throw new UnsupportedOperationException("not implemented");
×
659
    }
660

661
    @Override
662
    public void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionParameters parameters) {
663
      throw new UnsupportedOperationException("not implemented");
×
664
    }
665

666
    @Override
667
    public Optional<byte[]> mutableSideEffect(
668
        String id, DataConverter converter, Func1<Optional<byte[]>, Optional<byte[]>> func) {
669
      return func.apply(Optional.empty());
1✔
670
    }
671

672
    @Override
673
    public long currentTimeMillis() {
674
      throw new UnsupportedOperationException("not implemented");
×
675
    }
676

677
    @Override
678
    public boolean isReplaying() {
679
      throw new UnsupportedOperationException("not implemented");
×
680
    }
681

682
    @Override
683
    public Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback) {
684
      throw new UnsupportedOperationException("not implemented");
×
685
    }
686

687
    @Override
688
    public byte[] sideEffect(Func<byte[]> func) {
689
      throw new UnsupportedOperationException("not implemented");
×
690
    }
691

692
    @Override
693
    public int getVersion(
694
        String changeID, DataConverter converter, int minSupported, int maxSupported) {
695
      throw new UnsupportedOperationException("not implemented");
×
696
    }
697

698
    @Override
699
    public Random newRandom() {
700
      throw new UnsupportedOperationException("not implemented");
×
701
    }
702

703
    @Override
704
    public Scope getMetricsScope() {
705
      return NoopScope.getInstance();
1✔
706
    }
707

708
    @Override
709
    public boolean getEnableLoggingInReplay() {
710
      return false;
×
711
    }
712

713
    @Override
714
    public UUID randomUUID() {
715
      return UUID.randomUUID();
1✔
716
    }
717

718
    @Override
719
    public void upsertSearchAttributes(SearchAttributes searchAttributes) {
720
      throw new UnsupportedOperationException("not implemented");
×
721
    }
722
  }
723
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc