• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2555

24 Oct 2024 10:50PM UTC coverage: 66.622% (+0.4%) from 66.195%
2555

push

buildkite

web-flow
Refactor Test environment initialization to CadenceTestRule from WorkflowTest. (#923)

WorkflowTest is currently 6,000 lines long and has nearly every test related to end to end client behavior. It provides the rather neat behavior that it supports running against both an instance of Cadence running in Docker and against the test version. It's additionally parameterized to run the entire test suite with or without sticky execution enabled.

Due to the complexity in handling both environments, adding yet another test to WorkflowTest has always been the easiest option for developers. To allow for tests to easily be split into other files, extract the core functionality to a Junit test rule that can easily be reused by additional tests.

With the exception of testSignalCrossDomainExternalWorkflow and the replay tests that don't use the test environment, all tests have been left in WorkflowTest to be split out later.

12910 of 19378 relevant lines covered (66.62%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.58
/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.common;
19

20
import static java.nio.charset.StandardCharsets.UTF_8;
21

22
import com.google.common.io.CharStreams;
23
import com.google.gson.Gson;
24
import com.google.gson.GsonBuilder;
25
import com.google.gson.JsonElement;
26
import com.google.gson.JsonObject;
27
import com.google.gson.JsonParser;
28
import com.google.gson.JsonPrimitive;
29
import com.uber.cadence.ActivityType;
30
import com.uber.cadence.Decision;
31
import com.uber.cadence.DecisionType;
32
import com.uber.cadence.EntityNotExistsError;
33
import com.uber.cadence.EventType;
34
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
35
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
36
import com.uber.cadence.History;
37
import com.uber.cadence.HistoryEvent;
38
import com.uber.cadence.HistoryEventFilterType;
39
import com.uber.cadence.TaskList;
40
import com.uber.cadence.WorkflowExecution;
41
import com.uber.cadence.WorkflowExecutionCloseStatus;
42
import com.uber.cadence.WorkflowExecutionFailedEventAttributes;
43
import com.uber.cadence.WorkflowExecutionTerminatedEventAttributes;
44
import com.uber.cadence.WorkflowExecutionTimedOutEventAttributes;
45
import com.uber.cadence.WorkflowType;
46
import com.uber.cadence.client.WorkflowTerminatedException;
47
import com.uber.cadence.client.WorkflowTimedOutException;
48
import com.uber.cadence.common.RetryOptions;
49
import com.uber.cadence.common.WorkflowExecutionHistory;
50
import com.uber.cadence.serviceclient.IWorkflowService;
51
import java.io.File;
52
import java.io.IOException;
53
import java.io.Reader;
54
import java.lang.reflect.InvocationTargetException;
55
import java.lang.reflect.Method;
56
import java.lang.reflect.Modifier;
57
import java.nio.ByteBuffer;
58
import java.nio.file.Files;
59
import java.time.Duration;
60
import java.util.Collection;
61
import java.util.Date;
62
import java.util.Iterator;
63
import java.util.List;
64
import java.util.Map;
65
import java.util.Map.Entry;
66
import java.util.Optional;
67
import java.util.concurrent.CancellationException;
68
import java.util.concurrent.CompletableFuture;
69
import java.util.concurrent.TimeUnit;
70
import java.util.concurrent.TimeoutException;
71
import org.apache.thrift.TException;
72
import org.apache.thrift.async.AsyncMethodCallback;
73

74
/**
75
 * Convenience methods to be used by unit tests and during development.
76
 *
77
 * @author fateev
78
 */
79
public class WorkflowExecutionUtils {
×
80

81
  /**
82
   * Indentation for history and decisions pretty printing. Do not change it from 2 spaces. The gson
83
   * pretty printer has it hardcoded and changing it breaks the indentation of exception stack
84
   * traces.
85
   */
86
  private static final String INDENTATION = "  ";
87

88
  // Wait period for passive cluster to retry getting workflow result in case of replication delay.
89
  private static final long ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS = 500;
90

91
  /**
92
   * Returns result of a workflow instance execution or throws an exception if workflow did not
93
   * complete successfully.
94
   *
95
   * @param workflowType is optional.
96
   * @throws TimeoutException if workflow didn't complete within specified timeout
97
   * @throws CancellationException if workflow was cancelled
98
   * @throws WorkflowExecutionFailedException if workflow execution failed
99
   * @throws WorkflowTimedOutException if workflow execution exceeded its execution timeout and was
100
   *     forcefully terminated by the Cadence server.
101
   * @throws WorkflowTerminatedException if workflow execution was terminated through an external
102
   *     terminate command.
103
   */
104
  public static byte[] getWorkflowExecutionResult(
105
      IWorkflowService service,
106
      String domain,
107
      WorkflowExecution workflowExecution,
108
      Optional<String> workflowType,
109
      long timeout,
110
      TimeUnit unit)
111
      throws TimeoutException, CancellationException, WorkflowExecutionFailedException,
112
          WorkflowTerminatedException, WorkflowTimedOutException, EntityNotExistsError {
113
    // getInstanceCloseEvent waits for workflow completion including new runs.
114
    HistoryEvent closeEvent =
1✔
115
        getInstanceCloseEvent(service, domain, workflowExecution, timeout, unit);
1✔
116
    return getResultFromCloseEvent(workflowExecution, workflowType, closeEvent);
1✔
117
  }
118

119
  public static CompletableFuture<byte[]> getWorkflowExecutionResultAsync(
120
      IWorkflowService service,
121
      String domain,
122
      WorkflowExecution workflowExecution,
123
      Optional<String> workflowType,
124
      long timeout,
125
      TimeUnit unit) {
126
    return getInstanceCloseEventAsync(service, domain, workflowExecution, timeout, unit)
1✔
127
        .thenApply(
1✔
128
            (closeEvent) -> getResultFromCloseEvent(workflowExecution, workflowType, closeEvent));
1✔
129
  }
130

131
  private static byte[] getResultFromCloseEvent(
132
      WorkflowExecution workflowExecution, Optional<String> workflowType, HistoryEvent closeEvent) {
133
    if (closeEvent == null) {
1✔
134
      throw new IllegalStateException("Workflow is still running");
×
135
    }
136
    switch (closeEvent.getEventType()) {
1✔
137
      case WorkflowExecutionCompleted:
138
        return closeEvent.getWorkflowExecutionCompletedEventAttributes().getResult();
1✔
139
      case WorkflowExecutionCanceled:
140
        byte[] details = closeEvent.getWorkflowExecutionCanceledEventAttributes().getDetails();
1✔
141
        String message = details != null ? new String(details, UTF_8) : null;
1✔
142
        throw new CancellationException(message);
1✔
143
      case WorkflowExecutionFailed:
144
        WorkflowExecutionFailedEventAttributes failed =
1✔
145
            closeEvent.getWorkflowExecutionFailedEventAttributes();
1✔
146
        throw new WorkflowExecutionFailedException(
1✔
147
            failed.getReason(), failed.getDetails(), failed.getDecisionTaskCompletedEventId());
1✔
148
      case WorkflowExecutionTerminated:
149
        WorkflowExecutionTerminatedEventAttributes terminated =
×
150
            closeEvent.getWorkflowExecutionTerminatedEventAttributes();
×
151
        throw new WorkflowTerminatedException(
×
152
            workflowExecution,
153
            workflowType,
154
            terminated.getReason(),
×
155
            terminated.getIdentity(),
×
156
            terminated.getDetails());
×
157
      case WorkflowExecutionTimedOut:
158
        WorkflowExecutionTimedOutEventAttributes timedOut =
1✔
159
            closeEvent.getWorkflowExecutionTimedOutEventAttributes();
1✔
160
        throw new WorkflowTimedOutException(
1✔
161
            workflowExecution, workflowType, timedOut.getTimeoutType());
1✔
162
      default:
163
        throw new RuntimeException(
×
164
            "Workflow end state is not completed: " + prettyPrintHistoryEvent(closeEvent));
×
165
    }
166
  }
167

168
  /** Returns an instance closing event, potentially waiting for workflow to complete. */
169
  private static HistoryEvent getInstanceCloseEvent(
170
      IWorkflowService service,
171
      String domain,
172
      WorkflowExecution workflowExecution,
173
      long timeout,
174
      TimeUnit unit)
175
      throws TimeoutException, EntityNotExistsError {
176
    byte[] pageToken = null;
1✔
177
    GetWorkflowExecutionHistoryResponse response;
178
    // TODO: Interrupt service long poll call on timeout and on interrupt
179
    long start = System.currentTimeMillis();
1✔
180
    HistoryEvent event;
181
    do {
182
      if (timeout != 0 && System.currentTimeMillis() - start > unit.toMillis(timeout)) {
1✔
183
        throw new TimeoutException(
×
184
            "WorkflowId="
185
                + workflowExecution.getWorkflowId()
×
186
                + ", runId="
187
                + workflowExecution.getRunId()
×
188
                + ", timeout="
189
                + timeout
190
                + ", unit="
191
                + unit);
192
      }
193

194
      GetWorkflowExecutionHistoryRequest r = new GetWorkflowExecutionHistoryRequest();
1✔
195
      r.setDomain(domain);
1✔
196
      r.setExecution(workflowExecution);
1✔
197
      r.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
1✔
198
      r.setNextPageToken(pageToken);
1✔
199
      r.setWaitForNewEvent(true);
1✔
200
      r.setSkipArchival(true);
1✔
201
      RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
1✔
202
      try {
203
        response =
1✔
204
            RpcRetryer.retryWithResult(
1✔
205
                retryOptions,
206
                () -> service.GetWorkflowExecutionHistoryWithTimeout(r, unit.toMillis(timeout)));
1✔
207
      } catch (EntityNotExistsError e) {
×
208
        if (e.activeCluster != null
×
209
            && e.currentCluster != null
210
            && !e.activeCluster.equals(e.currentCluster)) {
×
211
          // Current cluster is passive cluster. Execution might not exist because of replication
212
          // lag. If we are still within timeout, wait for a little bit and retry.
213
          if (timeout != 0
×
214
              && System.currentTimeMillis() + ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS - start
×
215
                  > unit.toMillis(timeout)) {
×
216
            throw e;
×
217
          }
218

219
          try {
220
            Thread.sleep(ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS);
×
221
          } catch (InterruptedException ie) {
×
222
            // Throw entity not exist here.
223
            throw e;
×
224
          }
×
225
          continue;
×
226
        }
227
        throw e;
×
228
      } catch (TException e) {
×
229
        throw CheckedExceptionWrapper.wrap(e);
×
230
      }
1✔
231

232
      pageToken = response.getNextPageToken();
1✔
233
      History history = response.getHistory();
1✔
234
      if (history != null && history.getEvents().size() > 0) {
1✔
235
        event = history.getEvents().get(0);
1✔
236
        if (!isWorkflowExecutionCompletedEvent(event)) {
1✔
237
          throw new RuntimeException("Last history event is not completion event: " + event);
×
238
        }
239
        // Workflow called continueAsNew. Start polling the new generation with new runId.
240
        if (event.getEventType() == EventType.WorkflowExecutionContinuedAsNew) {
1✔
241
          pageToken = null;
1✔
242
          workflowExecution =
1✔
243
              new WorkflowExecution()
244
                  .setWorkflowId(workflowExecution.getWorkflowId())
1✔
245
                  .setRunId(
1✔
246
                      event
247
                          .getWorkflowExecutionContinuedAsNewEventAttributes()
1✔
248
                          .getNewExecutionRunId());
1✔
249
          continue;
1✔
250
        }
251
        break;
252
      }
253
    } while (true);
×
254
    return event;
1✔
255
  }
256

257
  /** Returns an instance closing event, potentially waiting for workflow to complete. */
258
  private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
259
      IWorkflowService service,
260
      String domain,
261
      final WorkflowExecution workflowExecution,
262
      long timeout,
263
      TimeUnit unit) {
264
    return getInstanceCloseEventAsync(service, domain, workflowExecution, null, timeout, unit);
1✔
265
  }
266

267
  private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
268
      IWorkflowService service,
269
      String domain,
270
      final WorkflowExecution workflowExecution,
271
      byte[] pageToken,
272
      long timeout,
273
      TimeUnit unit) {
274
    // TODO: Interrupt service long poll call on timeout and on interrupt
275
    long start = System.currentTimeMillis();
1✔
276
    GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
1✔
277
    request.setDomain(domain);
1✔
278
    request.setExecution(workflowExecution);
1✔
279
    request.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
1✔
280
    request.setWaitForNewEvent(true);
1✔
281
    request.setNextPageToken(pageToken);
1✔
282
    CompletableFuture<GetWorkflowExecutionHistoryResponse> response =
1✔
283
        getWorkflowExecutionHistoryAsync(service, request, timeout, unit);
1✔
284
    return response.thenComposeAsync(
1✔
285
        (r) -> {
286
          long elapsedTime = System.currentTimeMillis() - start;
1✔
287
          if (timeout != 0 && elapsedTime > unit.toMillis(timeout)) {
1✔
288
            throw CheckedExceptionWrapper.wrap(
×
289
                new TimeoutException(
290
                    "WorkflowId="
291
                        + workflowExecution.getWorkflowId()
×
292
                        + ", runId="
293
                        + workflowExecution.getRunId()
×
294
                        + ", timeout="
295
                        + timeout
296
                        + ", unit="
297
                        + unit));
298
          }
299
          History history = r.getHistory();
1✔
300
          if (history == null || history.getEvents().size() == 0) {
1✔
301
            // Empty poll returned
302
            return getInstanceCloseEventAsync(
×
303
                service, domain, workflowExecution, pageToken, timeout - elapsedTime, unit);
304
          }
305
          HistoryEvent event = history.getEvents().get(0);
1✔
306
          if (!isWorkflowExecutionCompletedEvent(event)) {
1✔
307
            throw new RuntimeException("Last history event is not completion event: " + event);
×
308
          }
309
          // Workflow called continueAsNew. Start polling the new generation with new runId.
310
          if (event.getEventType() == EventType.WorkflowExecutionContinuedAsNew) {
1✔
311
            WorkflowExecution nextWorkflowExecution =
×
312
                new WorkflowExecution()
313
                    .setWorkflowId(workflowExecution.getWorkflowId())
×
314
                    .setRunId(
×
315
                        event
316
                            .getWorkflowExecutionContinuedAsNewEventAttributes()
×
317
                            .getNewExecutionRunId());
×
318
            return getInstanceCloseEventAsync(
×
319
                service,
320
                domain,
321
                nextWorkflowExecution,
322
                r.getNextPageToken(),
×
323
                timeout - elapsedTime,
324
                unit);
325
          }
326
          return CompletableFuture.completedFuture(event);
1✔
327
        });
328
  }
329

330
  private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit unit) {
331
    return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS)
1✔
332
        .setExpiration(Duration.ofSeconds(unit.toSeconds(timeout)))
1✔
333
        .build();
1✔
334
  }
335

336
  private static CompletableFuture<GetWorkflowExecutionHistoryResponse>
337
      getWorkflowExecutionHistoryAsync(
338
          IWorkflowService service,
339
          GetWorkflowExecutionHistoryRequest r,
340
          long timeout,
341
          TimeUnit unit) {
342
    RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
1✔
343
    return RpcRetryer.retryWithResultAsync(
1✔
344
        retryOptions,
345
        () -> {
346
          CompletableFuture<GetWorkflowExecutionHistoryResponse> result = new CompletableFuture<>();
1✔
347
          try {
348
            service.GetWorkflowExecutionHistoryWithTimeout(
1✔
349
                r,
350
                new AsyncMethodCallback<GetWorkflowExecutionHistoryResponse>() {
1✔
351
                  @Override
352
                  public void onComplete(GetWorkflowExecutionHistoryResponse response) {
353
                    result.complete(response);
1✔
354
                  }
1✔
355

356
                  @Override
357
                  public void onError(Exception exception) {
358
                    result.completeExceptionally(exception);
×
359
                  }
×
360
                },
361
                unit.toMillis(timeout));
1✔
362
          } catch (TException e) {
×
363
            result.completeExceptionally(e);
×
364
          }
1✔
365
          return result;
1✔
366
        });
367
  }
368

369
  public static boolean isWorkflowExecutionCompletedEvent(HistoryEvent event) {
370
    return ((event != null)
1✔
371
        && (event.getEventType() == EventType.WorkflowExecutionCompleted
1✔
372
            || event.getEventType() == EventType.WorkflowExecutionCanceled
1✔
373
            || event.getEventType() == EventType.WorkflowExecutionFailed
1✔
374
            || event.getEventType() == EventType.WorkflowExecutionTimedOut
1✔
375
            || event.getEventType() == EventType.WorkflowExecutionContinuedAsNew
1✔
376
            || event.getEventType() == EventType.WorkflowExecutionTerminated));
1✔
377
  }
378

379
  public static boolean isWorkflowExecutionCompleteDecision(Decision decision) {
380
    return ((decision != null)
1✔
381
        && (decision.getDecisionType() == DecisionType.CompleteWorkflowExecution
1✔
382
            || decision.getDecisionType() == DecisionType.CancelWorkflowExecution
1✔
383
            || decision.getDecisionType() == DecisionType.FailWorkflowExecution
1✔
384
            || decision.getDecisionType() == DecisionType.ContinueAsNewWorkflowExecution));
1✔
385
  }
386

387
  public static String getId(HistoryEvent historyEvent) {
388
    String id = null;
×
389
    if (historyEvent != null) {
×
390
      if (historyEvent.getEventType() == EventType.StartChildWorkflowExecutionFailed) {
×
391
        id = historyEvent.getStartChildWorkflowExecutionFailedEventAttributes().getWorkflowId();
×
392
      }
393
    }
394

395
    return id;
×
396
  }
397

398
  public static WorkflowExecutionCloseStatus getCloseStatus(HistoryEvent event) {
399
    switch (event.getEventType()) {
1✔
400
      case WorkflowExecutionCanceled:
401
        return WorkflowExecutionCloseStatus.CANCELED;
×
402
      case WorkflowExecutionFailed:
403
        return WorkflowExecutionCloseStatus.FAILED;
1✔
404
      case WorkflowExecutionTimedOut:
405
        return WorkflowExecutionCloseStatus.TIMED_OUT;
×
406
      case WorkflowExecutionContinuedAsNew:
407
        return WorkflowExecutionCloseStatus.CONTINUED_AS_NEW;
1✔
408
      case WorkflowExecutionCompleted:
409
        return WorkflowExecutionCloseStatus.COMPLETED;
×
410
      case WorkflowExecutionTerminated:
411
        return WorkflowExecutionCloseStatus.TERMINATED;
×
412
      default:
413
        throw new IllegalArgumentException("Not close event: " + event);
×
414
    }
415
  }
416

417
  public static GetWorkflowExecutionHistoryResponse getHistoryPage(
418
      byte[] nextPageToken,
419
      IWorkflowService service,
420
      String domain,
421
      WorkflowExecution workflowExecution) {
422

423
    GetWorkflowExecutionHistoryRequest getHistoryRequest = new GetWorkflowExecutionHistoryRequest();
1✔
424
    getHistoryRequest.setDomain(domain);
1✔
425
    getHistoryRequest.setExecution(workflowExecution);
1✔
426
    getHistoryRequest.setNextPageToken(nextPageToken);
1✔
427

428
    GetWorkflowExecutionHistoryResponse history;
429
    try {
430
      history = service.GetWorkflowExecutionHistory(getHistoryRequest);
1✔
431
    } catch (TException e) {
×
432
      throw new Error(e);
×
433
    }
1✔
434
    if (history == null) {
1✔
435
      throw new IllegalArgumentException("unknown workflow execution: " + workflowExecution);
×
436
    }
437
    return history;
1✔
438
  }
439

440
  public static Iterator<HistoryEvent> getHistory(
441
      IWorkflowService service, String domain, WorkflowExecution workflowExecution) {
442
    return new Iterator<HistoryEvent>() {
×
443
      byte[] nextPageToken;
444
      Iterator<HistoryEvent> current;
445

446
      {
447
        getNextPage();
×
448
      }
×
449

450
      @Override
451
      public boolean hasNext() {
452
        return current.hasNext() || nextPageToken != null;
×
453
      }
454

455
      @Override
456
      public HistoryEvent next() {
457
        if (current.hasNext()) {
×
458
          return current.next();
×
459
        }
460
        getNextPage();
×
461
        return current.next();
×
462
      }
463

464
      private void getNextPage() {
465
        GetWorkflowExecutionHistoryResponse history =
×
466
            getHistoryPage(nextPageToken, service, domain, workflowExecution);
×
467
        current = history.getHistory().getEvents().iterator();
×
468
        nextPageToken = history.getNextPageToken();
×
469
      }
×
470
    };
471
  }
472

473
  /**
474
   * Returns workflow instance history in a human readable format.
475
   *
476
   * @param showWorkflowTasks when set to false workflow task events (decider events) are not
477
   *     included
478
   * @param history Workflow instance history
479
   */
480
  public static String prettyPrintHistory(History history, boolean showWorkflowTasks) {
481
    return prettyPrintHistory(history.getEvents().iterator(), showWorkflowTasks);
1✔
482
  }
483

484
  public static String prettyPrintHistory(
485
      Iterator<HistoryEvent> events, boolean showWorkflowTasks) {
486
    StringBuilder result = new StringBuilder();
1✔
487
    result.append("{");
1✔
488
    boolean first = true;
1✔
489
    long firstTimestamp = 0;
1✔
490
    while (events.hasNext()) {
1✔
491
      HistoryEvent event = events.next();
1✔
492
      if (!showWorkflowTasks && event.getEventType().toString().startsWith("WorkflowTask")) {
1✔
493
        continue;
×
494
      }
495
      if (first) {
1✔
496
        first = false;
1✔
497
        firstTimestamp = event.getTimestamp();
1✔
498
      } else {
499
        result.append(",");
1✔
500
      }
501
      result.append("\n");
1✔
502
      result.append(INDENTATION);
1✔
503
      result.append(prettyPrintHistoryEvent(event, firstTimestamp));
1✔
504
    }
1✔
505
    result.append("\n}");
1✔
506
    return result.toString();
1✔
507
  }
508

509
  public static String prettyPrintDecisions(Iterable<Decision> decisions) {
510
    StringBuilder result = new StringBuilder();
×
511
    result.append("{");
×
512
    boolean first = true;
×
513
    for (Decision decision : decisions) {
×
514
      if (first) {
×
515
        first = false;
×
516
      } else {
517
        result.append(",");
×
518
      }
519
      result.append("\n");
×
520
      result.append(INDENTATION);
×
521
      result.append(prettyPrintDecision(decision));
×
522
    }
×
523
    result.append("\n}");
×
524
    return result.toString();
×
525
  }
526

527
  /**
528
   * Returns single event in a human readable format
529
   *
530
   * @param event event to pretty print
531
   */
532
  public static String prettyPrintHistoryEvent(HistoryEvent event) {
533
    return prettyPrintHistoryEvent(event, -1);
×
534
  }
535

536
  private static String prettyPrintHistoryEvent(HistoryEvent event, long firstTimestamp) {
537
    String eventType = event.getEventType().toString();
1✔
538
    StringBuilder result = new StringBuilder();
1✔
539
    result.append(event.getEventId());
1✔
540
    result.append(": ");
1✔
541
    result.append(eventType);
1✔
542
    if (firstTimestamp > 0) {
1✔
543
      // timestamp is in nanos
544
      long timestamp = (event.getTimestamp() - firstTimestamp) / 1_000_000;
1✔
545
      result.append(String.format(" [%s ms]", timestamp));
1✔
546
    }
547
    result.append(" ");
1✔
548
    result.append(
1✔
549
        prettyPrintObject(
1✔
550
            getEventAttributes(event), "getFieldValue", true, INDENTATION, false, false));
1✔
551

552
    return result.toString();
1✔
553
  }
554

555
  private static Object getEventAttributes(HistoryEvent event) {
556
    try {
557
      Method m = HistoryEvent.class.getMethod("get" + event.getEventType() + "EventAttributes");
1✔
558
      return m.invoke(event);
1✔
559
    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
×
560
      return event;
×
561
    }
562
  }
563

564
  /**
565
   * Returns single decision in a human readable format
566
   *
567
   * @param decision decision to pretty print
568
   */
569
  public static String prettyPrintDecision(Decision decision) {
570
    return prettyPrintObject(decision, "getFieldValue", true, INDENTATION, true, true);
×
571
  }
572

573
  /**
574
   * Not really a generic method for printing random object graphs. But it works for events and
575
   * decisions.
576
   */
577
  private static String prettyPrintObject(
578
      Object object,
579
      String methodToSkip,
580
      boolean skipNullsAndEmptyCollections,
581
      String indentation,
582
      boolean skipLevel,
583
      boolean printTypeName) {
584
    StringBuilder result = new StringBuilder();
1✔
585
    if (object == null) {
1✔
586
      return "null";
×
587
    }
588
    Class<? extends Object> clz = object.getClass();
1✔
589
    if (Number.class.isAssignableFrom(clz)) {
1✔
590
      return String.valueOf(object);
1✔
591
    }
592
    if (Boolean.class.isAssignableFrom(clz)) {
1✔
593
      return String.valueOf(object);
×
594
    }
595
    if (clz.equals(String.class)) {
1✔
596
      return (String) object;
1✔
597
    }
598
    if (clz.equals(byte[].class)) {
1✔
599
      return new String((byte[]) object, UTF_8);
1✔
600
    }
601
    if (ByteBuffer.class.isAssignableFrom(clz)) {
1✔
602
      byte[] bytes = org.apache.thrift.TBaseHelper.byteBufferToByteArray((ByteBuffer) object);
×
603
      return new String(bytes, UTF_8);
×
604
    }
605
    if (clz.equals(Date.class)) {
1✔
606
      return String.valueOf(object);
×
607
    }
608
    if (clz.equals(TaskList.class)) {
1✔
609
      return String.valueOf(((TaskList) object).getName());
1✔
610
    }
611
    if (clz.equals(ActivityType.class)) {
1✔
612
      return String.valueOf(((ActivityType) object).getName());
1✔
613
    }
614
    if (clz.equals(WorkflowType.class)) {
1✔
615
      return String.valueOf(((WorkflowType) object).getName());
1✔
616
    }
617
    if (Map.Entry.class.isAssignableFrom(clz)) {
1✔
618
      result.append(
×
619
          prettyPrintObject(
×
620
              ((Map.Entry) object).getKey(),
×
621
              methodToSkip,
622
              skipNullsAndEmptyCollections,
623
              "",
624
              skipLevel,
625
              printTypeName));
626
      result.append("=");
×
627
      result.append(
×
628
          prettyPrintObject(
×
629
              ((Map.Entry) object).getValue(),
×
630
              methodToSkip,
631
              skipNullsAndEmptyCollections,
632
              "",
633
              skipLevel,
634
              printTypeName));
635
      return result.toString();
×
636
    }
637
    if (Map.class.isAssignableFrom(clz)) {
1✔
638
      result.append("{ ");
×
639

640
      String prefix = "";
×
641
      for (Object entry : ((Map) object).entrySet()) {
×
642
        result.append(prefix);
×
643
        prefix = ", ";
×
644
        result.append(
×
645
            prettyPrintObject(
×
646
                entry, methodToSkip, skipNullsAndEmptyCollections, "", skipLevel, printTypeName));
647
      }
×
648

649
      result.append(" }");
×
650
      return result.toString();
×
651
    }
652
    if (Collection.class.isAssignableFrom(clz)) {
1✔
653
      return String.valueOf(object);
×
654
    }
655
    if (!skipLevel) {
1✔
656
      if (printTypeName) {
1✔
657
        result.append(object.getClass().getSimpleName());
×
658
        result.append(" ");
×
659
      }
660
      result.append("{");
1✔
661
    }
662
    Method[] eventMethods = object.getClass().getDeclaredMethods();
1✔
663
    boolean first = true;
1✔
664
    for (Method method : eventMethods) {
1✔
665
      String name = method.getName();
1✔
666
      if (!name.startsWith("get")
1✔
667
          || name.equals("getDecisionType")
1✔
668
          || method.getParameterCount() != 0
1✔
669
          || !Modifier.isPublic(method.getModifiers())) {
1✔
670
        continue;
×
671
      }
672
      if (name.equals(methodToSkip) || name.equals("getClass")) {
1✔
673
        continue;
×
674
      }
675
      if (Modifier.isStatic(method.getModifiers())) {
1✔
676
        continue;
×
677
      }
678
      Object value;
679
      try {
680
        value = method.invoke(object, (Object[]) null);
1✔
681
      } catch (InvocationTargetException e) {
×
682
        throw new RuntimeException(e.getTargetException());
×
683
      } catch (RuntimeException e) {
×
684
        throw e;
×
685
      } catch (Exception e) {
×
686
        throw new RuntimeException(e);
×
687
      }
1✔
688
      if (skipNullsAndEmptyCollections) {
1✔
689
        if (value == null) {
1✔
690
          continue;
1✔
691
        }
692
        if (value instanceof Map && ((Map<?, ?>) value).isEmpty()) {
1✔
693
          continue;
×
694
        }
695
        if (value instanceof Collection && ((Collection<?>) value).isEmpty()) {
1✔
696
          continue;
×
697
        }
698
      }
699
      if (!skipLevel) {
1✔
700
        if (first) {
1✔
701
          first = false;
1✔
702
        } else {
703
          result.append(";");
1✔
704
        }
705
        result.append("\n");
1✔
706
        result.append(indentation);
1✔
707
        result.append(INDENTATION);
1✔
708
        result.append(name.substring(3));
1✔
709
        result.append(" = ");
1✔
710
        // Pretty print JSON serialized exceptions.
711
        if (name.equals("getDetails") && value instanceof byte[]) {
1✔
712
          String details = new String((byte[]) value, UTF_8);
×
713
          details = prettyPrintJson(details, INDENTATION + INDENTATION);
×
714
          // GSON pretty prints, but doesn't let to set an initial indentation.
715
          // Thus indenting the pretty printed JSON through regexp :(.
716
          String replacement = "\n" + indentation + INDENTATION;
×
717
          details = details.replaceAll("\\n|\\\\n", replacement);
×
718
          result.append(details);
×
719
          continue;
×
720
        }
721
        result.append(
1✔
722
            prettyPrintObject(
1✔
723
                value,
724
                methodToSkip,
725
                skipNullsAndEmptyCollections,
726
                indentation + INDENTATION,
727
                false,
728
                false));
729
      } else {
730
        result.append(
×
731
            prettyPrintObject(
×
732
                value,
733
                methodToSkip,
734
                skipNullsAndEmptyCollections,
735
                indentation,
736
                false,
737
                printTypeName));
738
      }
739
    }
740
    if (!skipLevel) {
1✔
741
      result.append("\n");
1✔
742
      result.append(indentation);
1✔
743
      result.append("}");
1✔
744
    }
745
    return result.toString();
1✔
746
  }
747

748
  public static boolean containsEvent(List<HistoryEvent> history, EventType eventType) {
749
    for (HistoryEvent event : history) {
1✔
750
      if (event.getEventType() == eventType) {
1✔
751
        return true;
1✔
752
      }
753
    }
1✔
754
    return false;
×
755
  }
756

757
  /**
758
   * Pretty prints JSON. Not a generic utility. Used to prettify Details fields that contain
759
   * serialized exceptions.
760
   */
761
  private static String prettyPrintJson(String jsonValue, String stackIndentation) {
762
    try {
763
      JsonObject json = JsonParser.parseString(jsonValue).getAsJsonObject();
×
764
      fixStackTrace(json, stackIndentation);
×
765
      Gson gson = new GsonBuilder().setPrettyPrinting().create();
×
766
      return gson.toJson(json);
×
767
    } catch (Exception e) {
×
768
      return jsonValue;
×
769
    }
770
  }
771

772
  private static void fixStackTrace(JsonElement json, String stackIndentation) {
773
    if (!json.isJsonObject()) {
×
774
      return;
×
775
    }
776
    for (Entry<String, JsonElement> entry : json.getAsJsonObject().entrySet()) {
×
777
      if ("stackTrace".equals(entry.getKey())) {
×
778
        String value = entry.getValue().getAsString();
×
779
        String replacement = "\n" + stackIndentation;
×
780
        String fixed = value.replaceAll("\\n", replacement);
×
781
        entry.setValue(new JsonPrimitive(fixed));
×
782
        continue;
×
783
      }
784
      fixStackTrace(entry.getValue(), stackIndentation + INDENTATION);
×
785
    }
×
786
  }
×
787

788
  /** Is this an event that was created to mirror a decision? */
789
  public static boolean isDecisionEvent(HistoryEvent event) {
790
    EventType eventType = event.getEventType();
1✔
791
    boolean result =
1✔
792
        ((event != null)
793
            && (eventType == EventType.ActivityTaskScheduled
794
                || eventType == EventType.StartChildWorkflowExecutionInitiated
795
                || eventType == EventType.TimerStarted
796
                || eventType == EventType.WorkflowExecutionCompleted
797
                || eventType == EventType.WorkflowExecutionFailed
798
                || eventType == EventType.WorkflowExecutionCanceled
799
                || eventType == EventType.WorkflowExecutionContinuedAsNew
800
                || eventType == EventType.ActivityTaskCancelRequested
801
                || eventType == EventType.RequestCancelActivityTaskFailed
802
                || eventType == EventType.TimerCanceled
803
                || eventType == EventType.CancelTimerFailed
804
                || eventType == EventType.RequestCancelExternalWorkflowExecutionInitiated
805
                || eventType == EventType.MarkerRecorded
806
                || eventType == EventType.SignalExternalWorkflowExecutionInitiated
807
                || eventType == EventType.UpsertWorkflowSearchAttributes));
808
    return result;
1✔
809
  }
810

811
  public static WorkflowExecutionHistory readHistoryFromResource(String resourceFileName)
812
      throws IOException {
813
    ClassLoader classLoader = WorkflowExecutionUtils.class.getClassLoader();
1✔
814
    String historyUrl = classLoader.getResource(resourceFileName).getFile();
1✔
815
    File historyFile = new File(historyUrl);
1✔
816
    return readHistory(historyFile);
1✔
817
  }
818

819
  public static WorkflowExecutionHistory readHistory(File historyFile) throws IOException {
820
    try (Reader reader = Files.newBufferedReader(historyFile.toPath(), UTF_8)) {
1✔
821
      String jsonHistory = CharStreams.toString(reader);
1✔
822
      return WorkflowExecutionHistory.fromJson(jsonHistory);
1✔
823
    }
824
  }
825
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc