• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #343

31 Oct 2024 06:31PM UTC coverage: 75.148% (-3.6%) from 78.794%
#343

push

github

web-flow
Fix jacoco coverage (#2304)

5139 of 8240 branches covered (62.37%)

Branch coverage included in aggregate %.

22841 of 28993 relevant lines covered (78.78%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.87
/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing.internal;
22

23
import static org.junit.Assert.assertNotNull;
24
import static org.junit.Assert.assertTrue;
25

26
import com.uber.m3.tally.Scope;
27
import io.temporal.activity.ActivityExecutionContext;
28
import io.temporal.client.ActivityCompletionException;
29
import io.temporal.common.SearchAttributeUpdate;
30
import io.temporal.common.interceptors.*;
31
import io.temporal.internal.sync.WorkflowMethodThreadNameStrategy;
32
import io.temporal.workflow.*;
33
import io.temporal.workflow.unsafe.WorkflowUnsafe;
34
import java.lang.reflect.Type;
35
import java.time.Duration;
36
import java.util.*;
37
import java.util.function.BiPredicate;
38
import java.util.function.Supplier;
39
import javax.annotation.Nonnull;
40
import org.slf4j.Logger;
41
import org.slf4j.LoggerFactory;
42

43
public class TracingWorkerInterceptor implements WorkerInterceptor {
44

45
  private static final Logger log = LoggerFactory.getLogger(TracingWorkerInterceptor.class);
1✔
46

47
  private final FilteredTrace trace;
48
  private List<String> expected;
49

50
  public TracingWorkerInterceptor(FilteredTrace trace) {
1✔
51
    this.trace = trace;
1✔
52
  }
1✔
53

54
  public String getTrace() {
55
    return String.join("\n", trace.getImpl());
1✔
56
  }
57

58
  public void setExpected(String... expected) {
59
    this.expected = Arrays.asList(expected);
1✔
60
  }
1✔
61

62
  public void assertExpected() {
63
    // As it stands, when the trace is empty but the expected list isn't this still passes.
64
    if (expected != null) {
1✔
65
      List<String> traceElements = trace.getImpl();
1✔
66
      for (int i = 0; i < traceElements.size(); i++) {
1✔
67
        String t = traceElements.get(i);
1✔
68
        String expectedRegExp;
69
        if (expected.size() <= i) {
1!
70
          expectedRegExp = "";
×
71
        } else {
72
          expectedRegExp = expected.get(i);
1✔
73
        }
74
        assertTrue(
1✔
75
            t
76
                + " doesn't match "
77
                + expectedRegExp
78
                + ": \n expected=\n"
79
                + String.join("\n", expected)
1✔
80
                + "\n actual=\n"
81
                + String.join("\n", traceElements)
1✔
82
                + "\n",
83
            t.matches(expectedRegExp));
1✔
84
      }
85
    }
86
  }
1✔
87

88
  @Override
89
  public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
90
    if (!WorkflowUnsafe.isReplaying()) {
1✔
91
      trace.add("interceptExecuteWorkflow " + Workflow.getInfo().getWorkflowId());
1✔
92
    }
93
    return new WorkflowInboundCallsInterceptorBase(next) {
1✔
94
      @Override
95
      public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
96
        next.init(new TracingWorkflowOutboundCallsInterceptor(trace, outboundCalls));
1✔
97
      }
1✔
98

99
      @Override
100
      public void handleSignal(SignalInput input) {
101
        trace.add("handleSignal " + input.getSignalName());
1✔
102
        super.handleSignal(input);
1✔
103
      }
1✔
104

105
      @Override
106
      public QueryOutput handleQuery(QueryInput input) {
107
        trace.add("handleQuery " + input.getQueryName());
1✔
108
        return super.handleQuery(input);
1✔
109
      }
110

111
      @Nonnull
112
      @Override
113
      public Object newWorkflowMethodThread(Runnable runnable, String name) {
114
        if (!WorkflowUnsafe.isReplaying()) {
1✔
115
          if (name.startsWith(WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX)) {
1!
116
            // strip the IDs we add to identify WF thread method
117
            trace.add("newThread " + WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX);
1✔
118
          } else {
119
            trace.add("newThread " + name);
×
120
          }
121
        }
122
        return next.newWorkflowMethodThread(runnable, name);
1✔
123
      }
124
    };
125
  }
126

127
  @Override
128
  public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) {
129
    return new TracingActivityInboundCallsInterceptor(trace, next);
1✔
130
  }
131

132
  public static class FilteredTrace {
1✔
133

134
    private final List<String> impl = Collections.synchronizedList(new ArrayList<>());
1✔
135

136
    public boolean add(String s) {
137
      log.trace("FilteredTrace isReplaying=" + WorkflowUnsafe.isReplaying());
1✔
138
      if (!WorkflowUnsafe.isReplaying()) {
1✔
139
        return impl.add(s);
1✔
140
      }
141
      return true;
1✔
142
    }
143

144
    List<String> getImpl() {
145
      return impl;
1✔
146
    }
147
  }
148

149
  private static class TracingWorkflowOutboundCallsInterceptor
150
      implements WorkflowOutboundCallsInterceptor {
151

152
    private final FilteredTrace trace;
153
    private final WorkflowOutboundCallsInterceptor next;
154

155
    private TracingWorkflowOutboundCallsInterceptor(
156
        FilteredTrace trace, WorkflowOutboundCallsInterceptor next) {
1✔
157
      WorkflowInfo workflowInfo =
158
          Workflow.getInfo(); // checks that info is available in the constructor
1✔
159
      assertNotNull(workflowInfo);
1✔
160
      this.trace = trace;
1✔
161
      this.next = Objects.requireNonNull(next);
1✔
162
    }
1✔
163

164
    @Override
165
    public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
166
      if (!WorkflowUnsafe.isReplaying()) {
1✔
167
        trace.add("executeActivity " + input.getActivityName());
1✔
168
      }
169
      return next.executeActivity(input);
1✔
170
    }
171

172
    @Override
173
    public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
174
      if (!WorkflowUnsafe.isReplaying()) {
1✔
175
        trace.add("executeLocalActivity " + input.getActivityName());
1✔
176
      }
177
      return next.executeLocalActivity(input);
1✔
178
    }
179

180
    @Override
181
    public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
182
      if (!WorkflowUnsafe.isReplaying()) {
1✔
183
        trace.add("executeChildWorkflow " + input.getWorkflowType());
1✔
184
      }
185
      return next.executeChildWorkflow(input);
1✔
186
    }
187

188
    @Override
189
    public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
190
        ExecuteNexusOperationInput<R> input) {
191
      if (!WorkflowUnsafe.isReplaying()) {
1!
192
        trace.add("executeNexusOperation " + input.getOperation());
1✔
193
      }
194
      return next.executeNexusOperation(input);
1✔
195
    }
196

197
    @Override
198
    public Random newRandom() {
199
      if (!WorkflowUnsafe.isReplaying()) {
×
200
        trace.add("newRandom");
×
201
      }
202
      return next.newRandom();
×
203
    }
204

205
    @Override
206
    public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
207
      if (!WorkflowUnsafe.isReplaying()) {
1!
208
        trace.add(
1✔
209
            "signalExternalWorkflow "
210
                + input.getExecution().getWorkflowId()
1✔
211
                + " "
212
                + input.getSignalName());
1✔
213
      }
214
      return next.signalExternalWorkflow(input);
1✔
215
    }
216

217
    @Override
218
    public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
219
      if (!WorkflowUnsafe.isReplaying()) {
×
220
        trace.add("cancelWorkflow " + input.getExecution().getWorkflowId());
×
221
      }
222
      return next.cancelWorkflow(input);
×
223
    }
224

225
    @Override
226
    public void sleep(Duration duration) {
227
      if (!WorkflowUnsafe.isReplaying()) {
1✔
228
        trace.add("sleep " + duration);
1✔
229
      }
230
      next.sleep(duration);
1✔
231
    }
1✔
232

233
    @Override
234
    public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
235
      if (!WorkflowUnsafe.isReplaying()) {
1!
236
        trace.add("await " + timeout + " " + reason);
1✔
237
      }
238
      return next.await(timeout, reason, unblockCondition);
1✔
239
    }
240

241
    @Override
242
    public void await(String reason, Supplier<Boolean> unblockCondition) {
243
      if (!WorkflowUnsafe.isReplaying()) {
1✔
244
        trace.add("await " + reason);
1✔
245
      }
246
      next.await(reason, unblockCondition);
1✔
247
    }
1✔
248

249
    @Override
250
    public Promise<Void> newTimer(Duration duration) {
251
      if (!WorkflowUnsafe.isReplaying()) {
1✔
252
        trace.add("newTimer " + duration);
1✔
253
      }
254
      return next.newTimer(duration);
1✔
255
    }
256

257
    @Override
258
    public Promise<Void> newTimer(Duration duration, TimerOptions options) {
259
      if (!WorkflowUnsafe.isReplaying()) {
×
260
        trace.add("newTimer " + duration);
×
261
      }
262
      return next.newTimer(duration, options);
×
263
    }
264

265
    @Override
266
    public <R> R sideEffect(Class<R> resultClass, Type resultType, Functions.Func<R> func) {
267
      if (!WorkflowUnsafe.isReplaying()) {
1✔
268
        trace.add("sideEffect");
1✔
269
      }
270
      return next.sideEffect(resultClass, resultType, func);
1✔
271
    }
272

273
    @Override
274
    public <R> R mutableSideEffect(
275
        String id,
276
        Class<R> resultClass,
277
        Type resultType,
278
        BiPredicate<R, R> updated,
279
        Functions.Func<R> func) {
280
      if (!WorkflowUnsafe.isReplaying()) {
1✔
281
        trace.add("mutableSideEffect");
1✔
282
      }
283
      return next.mutableSideEffect(id, resultClass, resultType, updated, func);
1✔
284
    }
285

286
    @Override
287
    public int getVersion(String changeId, int minSupported, int maxSupported) {
288
      if (!WorkflowUnsafe.isReplaying()) {
1✔
289
        trace.add("getVersion");
1✔
290
      }
291
      return next.getVersion(changeId, minSupported, maxSupported);
1✔
292
    }
293

294
    @Override
295
    public void continueAsNew(ContinueAsNewInput input) {
296
      if (!WorkflowUnsafe.isReplaying()) {
1✔
297
        trace.add("continueAsNew");
1✔
298
      }
299
      next.continueAsNew(input);
×
300
    }
×
301

302
    @Override
303
    public void registerQuery(RegisterQueryInput input) {
304
      String queryType = input.getQueryType();
1✔
305
      if (!WorkflowUnsafe.isReplaying()) {
1✔
306
        trace.add("registerQuery " + queryType);
1✔
307
      }
308
      next.registerQuery(
1✔
309
          new RegisterQueryInput(
310
              queryType,
311
              input.getDescription(),
1✔
312
              input.getArgTypes(),
1✔
313
              input.getGenericArgTypes(),
1✔
314
              (args) -> {
315
                Object result = input.getCallback().apply(args);
1✔
316
                if (!WorkflowUnsafe.isReplaying()) {
1!
317
                  if (queryType.equals("query")) {
1✔
318
                    log.trace("query", new Throwable());
1✔
319
                  }
320
                  trace.add("query " + queryType);
1✔
321
                }
322
                return result;
1✔
323
              }));
324
    }
1✔
325

326
    @Override
327
    public void registerSignalHandlers(RegisterSignalHandlersInput input) {
328
      if (!WorkflowUnsafe.isReplaying()) {
1✔
329
        StringBuilder signals = new StringBuilder();
1✔
330
        for (SignalRegistrationRequest request : input.getRequests()) {
1✔
331
          if (signals.length() > 0) {
1✔
332
            signals.append(", ");
1✔
333
          }
334
          signals.append(request.getSignalType());
1✔
335
        }
1✔
336
        trace.add("registerSignalHandlers " + signals);
1✔
337
      }
338
      next.registerSignalHandlers(input);
1✔
339
    }
1✔
340

341
    @Override
342
    public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
343
      if (!WorkflowUnsafe.isReplaying()) {
1✔
344
        StringBuilder updates = new StringBuilder();
1✔
345
        for (UpdateRegistrationRequest request : input.getRequests()) {
1✔
346
          if (updates.length() > 0) {
1✔
347
            updates.append(", ");
1✔
348
          }
349
          updates.append(request.getUpdateName());
1✔
350
        }
1✔
351
        trace.add("registerUpdateHandlers " + updates);
1✔
352
      }
353
      next.registerUpdateHandlers(input);
1✔
354
    }
1✔
355

356
    @Override
357
    public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
358
      if (!WorkflowUnsafe.isReplaying()) {
1✔
359
        trace.add("registerDynamicUpdateHandler");
1✔
360
      }
361
      next.registerDynamicUpdateHandler(input);
1✔
362
    }
1✔
363

364
    @Override
365
    public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
366
      if (!WorkflowUnsafe.isReplaying()) {
1✔
367
        trace.add("registerDynamicSignalHandler");
1✔
368
      }
369
      next.registerDynamicSignalHandler(input);
1✔
370
    }
1✔
371

372
    @Override
373
    public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
374
      if (!WorkflowUnsafe.isReplaying()) {
1✔
375
        trace.add("registerDynamicQueryHandler");
1✔
376
      }
377
      next.registerDynamicQueryHandler(input);
1✔
378
    }
1✔
379

380
    @Override
381
    public UUID randomUUID() {
382
      if (!WorkflowUnsafe.isReplaying()) {
×
383
        trace.add("randomUUID");
×
384
      }
385
      return next.randomUUID();
×
386
    }
387

388
    @Override
389
    public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
390
      if (!WorkflowUnsafe.isReplaying()) {
1✔
391
        trace.add("upsertSearchAttributes");
1✔
392
      }
393
      next.upsertSearchAttributes(searchAttributes);
1✔
394
    }
1✔
395

396
    @Override
397
    public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
398
      if (!WorkflowUnsafe.isReplaying()) {
1✔
399
        trace.add("upsertTypedSearchAttributes");
1✔
400
      }
401
      next.upsertTypedSearchAttributes(searchAttributeUpdates);
1✔
402
    }
1✔
403

404
    @Override
405
    public void upsertMemo(Map<String, Object> memo) {
406
      if (!WorkflowUnsafe.isReplaying()) {
×
407
        trace.add("upsertMemo");
×
408
      }
409
      next.upsertMemo(memo);
×
410
    }
×
411

412
    @Override
413
    public Scope getMetricsScope() {
414
      if (!WorkflowUnsafe.isReplaying()) {
×
415
        trace.add("getMetricsScope");
×
416
      }
417
      return next.getMetricsScope();
×
418
    }
419

420
    @Override
421
    public Object newChildThread(Runnable runnable, boolean detached, String name) {
422
      if (!WorkflowUnsafe.isReplaying()) {
1✔
423
        trace.add("newThread " + name);
1✔
424
      }
425
      return next.newChildThread(runnable, detached, name);
1✔
426
    }
427

428
    @Override
429
    public long currentTimeMillis() {
430
      if (!WorkflowUnsafe.isReplaying()) {
1✔
431
        trace.add("currentTimeMillis");
1✔
432
      }
433
      return next.currentTimeMillis();
1✔
434
    }
435
  }
436

437
  private static class TracingActivityInboundCallsInterceptor
438
      implements ActivityInboundCallsInterceptor {
439

440
    private final FilteredTrace trace;
441
    private final ActivityInboundCallsInterceptor next;
442
    private String type;
443
    private boolean local;
444

445
    public TracingActivityInboundCallsInterceptor(
446
        FilteredTrace trace, ActivityInboundCallsInterceptor next) {
1✔
447
      this.trace = trace;
1✔
448
      this.next = next;
1✔
449
    }
1✔
450

451
    @Override
452
    public void init(ActivityExecutionContext context) {
453
      this.type = context.getInfo().getActivityType();
1✔
454
      this.local = context.getInfo().isLocal();
1✔
455
      next.init(
1✔
456
          new ActivityExecutionContextBase(context) {
1✔
457
            @Override
458
            public <V> void heartbeat(V details) throws ActivityCompletionException {
459
              trace.add("heartbeat " + details);
1✔
460
              super.heartbeat(details);
1✔
461
            }
1✔
462
          });
463
    }
1✔
464

465
    @Override
466
    public ActivityOutput execute(ActivityInput input) {
467
      trace.add((local ? "local " : "") + "activity " + type);
1✔
468
      return next.execute(input);
1✔
469
    }
470
  }
471
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc