• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing.internal;
22

23
import static org.junit.Assert.assertNotNull;
24
import static org.junit.Assert.assertTrue;
25

26
import com.uber.m3.tally.Scope;
27
import io.temporal.activity.ActivityExecutionContext;
28
import io.temporal.client.ActivityCompletionException;
29
import io.temporal.common.SearchAttributeUpdate;
30
import io.temporal.common.interceptors.*;
31
import io.temporal.internal.sync.WorkflowMethodThreadNameStrategy;
32
import io.temporal.workflow.*;
33
import io.temporal.workflow.unsafe.WorkflowUnsafe;
34
import java.lang.reflect.Type;
35
import java.time.Duration;
36
import java.util.*;
37
import java.util.function.BiPredicate;
38
import java.util.function.Supplier;
39
import javax.annotation.Nonnull;
40
import org.slf4j.Logger;
41
import org.slf4j.LoggerFactory;
42

43
public class TracingWorkerInterceptor implements WorkerInterceptor {
44

45
  private static final Logger log = LoggerFactory.getLogger(TracingWorkerInterceptor.class);
1✔
46

47
  private final FilteredTrace trace;
48
  private List<String> expected;
49

50
  public TracingWorkerInterceptor(FilteredTrace trace) {
1✔
51
    this.trace = trace;
1✔
52
  }
1✔
53

54
  public String getTrace() {
55
    return String.join("\n", trace.getImpl());
1✔
56
  }
57

58
  public void setExpected(String... expected) {
59
    this.expected = Arrays.asList(expected);
1✔
60
  }
1✔
61

62
  public void assertExpected() {
63
    // As it stands, when the trace is empty but the expected list isn't this still passes.
64
    if (expected != null) {
1✔
65
      List<String> traceElements = trace.getImpl();
1✔
66
      for (int i = 0; i < traceElements.size(); i++) {
1✔
67
        String t = traceElements.get(i);
1✔
68
        String expectedRegExp;
69
        if (expected.size() <= i) {
1✔
70
          expectedRegExp = "";
×
71
        } else {
72
          expectedRegExp = expected.get(i);
1✔
73
        }
74
        assertTrue(
1✔
75
            t
76
                + " doesn't match "
77
                + expectedRegExp
78
                + ": \n expected=\n"
79
                + String.join("\n", expected)
1✔
80
                + "\n actual=\n"
81
                + String.join("\n", traceElements)
1✔
82
                + "\n",
83
            t.matches(expectedRegExp));
1✔
84
      }
85
    }
86
  }
1✔
87

88
  @Override
89
  public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
90
    if (!WorkflowUnsafe.isReplaying()) {
1✔
91
      trace.add("interceptExecuteWorkflow " + Workflow.getInfo().getWorkflowId());
1✔
92
    }
93
    return new WorkflowInboundCallsInterceptorBase(next) {
1✔
94
      @Override
95
      public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
96
        next.init(new TracingWorkflowOutboundCallsInterceptor(trace, outboundCalls));
1✔
97
      }
1✔
98

99
      @Override
100
      public void handleSignal(SignalInput input) {
101
        trace.add("handleSignal " + input.getSignalName());
1✔
102
        super.handleSignal(input);
1✔
103
      }
1✔
104

105
      @Override
106
      public QueryOutput handleQuery(QueryInput input) {
107
        trace.add("handleQuery " + input.getQueryName());
1✔
108
        return super.handleQuery(input);
1✔
109
      }
110

111
      @Nonnull
112
      @Override
113
      public Object newWorkflowMethodThread(Runnable runnable, String name) {
114
        if (!WorkflowUnsafe.isReplaying()) {
1✔
115
          if (name.startsWith(WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX)) {
1✔
116
            // strip the IDs we add to identify WF thread method
117
            trace.add("newThread " + WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX);
1✔
118
          } else {
119
            trace.add("newThread " + name);
×
120
          }
121
        }
122
        return next.newWorkflowMethodThread(runnable, name);
1✔
123
      }
124
    };
125
  }
126

127
  @Override
128
  public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) {
129
    return new TracingActivityInboundCallsInterceptor(trace, next);
1✔
130
  }
131

132
  public static class FilteredTrace {
1✔
133

134
    private final List<String> impl = Collections.synchronizedList(new ArrayList<>());
1✔
135

136
    public boolean add(String s) {
137
      log.trace("FilteredTrace isReplaying=" + WorkflowUnsafe.isReplaying());
1✔
138
      if (!WorkflowUnsafe.isReplaying()) {
1✔
139
        return impl.add(s);
1✔
140
      }
141
      return true;
1✔
142
    }
143

144
    List<String> getImpl() {
145
      return impl;
1✔
146
    }
147
  }
148

149
  private static class TracingWorkflowOutboundCallsInterceptor
150
      implements WorkflowOutboundCallsInterceptor {
151

152
    private final FilteredTrace trace;
153
    private final WorkflowOutboundCallsInterceptor next;
154

155
    private TracingWorkflowOutboundCallsInterceptor(
156
        FilteredTrace trace, WorkflowOutboundCallsInterceptor next) {
1✔
157
      WorkflowInfo workflowInfo =
158
          Workflow.getInfo(); // checks that info is available in the constructor
1✔
159
      assertNotNull(workflowInfo);
1✔
160
      this.trace = trace;
1✔
161
      this.next = Objects.requireNonNull(next);
1✔
162
    }
1✔
163

164
    @Override
165
    public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
166
      if (!WorkflowUnsafe.isReplaying()) {
1✔
167
        trace.add("executeActivity " + input.getActivityName());
1✔
168
      }
169
      return next.executeActivity(input);
1✔
170
    }
171

172
    @Override
173
    public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
174
      if (!WorkflowUnsafe.isReplaying()) {
1✔
175
        trace.add("executeLocalActivity " + input.getActivityName());
1✔
176
      }
177
      return next.executeLocalActivity(input);
1✔
178
    }
179

180
    @Override
181
    public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
182
      if (!WorkflowUnsafe.isReplaying()) {
1✔
183
        trace.add("executeChildWorkflow " + input.getWorkflowType());
1✔
184
      }
185
      return next.executeChildWorkflow(input);
1✔
186
    }
187

188
    @Override
189
    public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
190
        ExecuteNexusOperationInput<R> input) {
191
      if (!WorkflowUnsafe.isReplaying()) {
1✔
192
        trace.add("executeNexusOperation " + input.getOperation());
1✔
193
      }
194
      return next.executeNexusOperation(input);
1✔
195
    }
196

197
    @Override
198
    public Random newRandom() {
199
      if (!WorkflowUnsafe.isReplaying()) {
×
200
        trace.add("newRandom");
×
201
      }
202
      return next.newRandom();
×
203
    }
204

205
    @Override
206
    public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
207
      if (!WorkflowUnsafe.isReplaying()) {
1✔
208
        trace.add(
1✔
209
            "signalExternalWorkflow "
210
                + input.getExecution().getWorkflowId()
1✔
211
                + " "
212
                + input.getSignalName());
1✔
213
      }
214
      return next.signalExternalWorkflow(input);
1✔
215
    }
216

217
    @Override
218
    public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
219
      if (!WorkflowUnsafe.isReplaying()) {
×
220
        trace.add("cancelWorkflow " + input.getExecution().getWorkflowId());
×
221
      }
222
      return next.cancelWorkflow(input);
×
223
    }
224

225
    @Override
226
    public void sleep(Duration duration) {
227
      if (!WorkflowUnsafe.isReplaying()) {
1✔
228
        trace.add("sleep " + duration);
1✔
229
      }
230
      next.sleep(duration);
1✔
231
    }
1✔
232

233
    @Override
234
    public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
235
      if (!WorkflowUnsafe.isReplaying()) {
1✔
236
        trace.add("await " + timeout + " " + reason);
1✔
237
      }
238
      return next.await(timeout, reason, unblockCondition);
1✔
239
    }
240

241
    @Override
242
    public void await(String reason, Supplier<Boolean> unblockCondition) {
243
      if (!WorkflowUnsafe.isReplaying()) {
1✔
244
        trace.add("await " + reason);
1✔
245
      }
246
      next.await(reason, unblockCondition);
1✔
247
    }
1✔
248

249
    @Override
250
    public Promise<Void> newTimer(Duration duration) {
251
      if (!WorkflowUnsafe.isReplaying()) {
1✔
252
        trace.add("newTimer " + duration);
1✔
253
      }
254
      return next.newTimer(duration);
1✔
255
    }
256

257
    @Override
258
    public Promise<Void> newTimer(Duration duration, TimerOptions options) {
259
      if (!WorkflowUnsafe.isReplaying()) {
×
260
        trace.add("newTimer " + duration);
×
261
      }
262
      return next.newTimer(duration, options);
×
263
    }
264

265
    @Override
266
    public <R> R sideEffect(Class<R> resultClass, Type resultType, Functions.Func<R> func) {
267
      if (!WorkflowUnsafe.isReplaying()) {
1✔
268
        trace.add("sideEffect");
1✔
269
      }
270
      return next.sideEffect(resultClass, resultType, func);
1✔
271
    }
272

273
    @Override
274
    public <R> R mutableSideEffect(
275
        String id,
276
        Class<R> resultClass,
277
        Type resultType,
278
        BiPredicate<R, R> updated,
279
        Functions.Func<R> func) {
280
      if (!WorkflowUnsafe.isReplaying()) {
1✔
281
        trace.add("mutableSideEffect");
1✔
282
      }
283
      return next.mutableSideEffect(id, resultClass, resultType, updated, func);
1✔
284
    }
285

286
    @Override
287
    public int getVersion(String changeId, int minSupported, int maxSupported) {
288
      if (!WorkflowUnsafe.isReplaying()) {
1✔
289
        trace.add("getVersion");
1✔
290
      }
291
      return next.getVersion(changeId, minSupported, maxSupported);
1✔
292
    }
293

294
    @Override
295
    public void continueAsNew(ContinueAsNewInput input) {
296
      if (!WorkflowUnsafe.isReplaying()) {
1✔
297
        trace.add("continueAsNew");
1✔
298
      }
299
      next.continueAsNew(input);
×
300
    }
×
301

302
    @Override
303
    public void registerQuery(RegisterQueryInput input) {
304
      String queryType = input.getQueryType();
1✔
305
      if (!WorkflowUnsafe.isReplaying()) {
1✔
306
        trace.add("registerQuery " + queryType);
1✔
307
      }
308
      next.registerQuery(
1✔
309
          new RegisterQueryInput(
310
              queryType,
311
              input.getArgTypes(),
1✔
312
              input.getGenericArgTypes(),
1✔
313
              (args) -> {
314
                Object result = input.getCallback().apply(args);
1✔
315
                if (!WorkflowUnsafe.isReplaying()) {
1✔
316
                  if (queryType.equals("query")) {
1✔
317
                    log.trace("query", new Throwable());
1✔
318
                  }
319
                  trace.add("query " + queryType);
1✔
320
                }
321
                return result;
1✔
322
              }));
323
    }
1✔
324

325
    @Override
326
    public void registerSignalHandlers(RegisterSignalHandlersInput input) {
327
      if (!WorkflowUnsafe.isReplaying()) {
1✔
328
        StringBuilder signals = new StringBuilder();
1✔
329
        for (SignalRegistrationRequest request : input.getRequests()) {
1✔
330
          if (signals.length() > 0) {
1✔
331
            signals.append(", ");
1✔
332
          }
333
          signals.append(request.getSignalType());
1✔
334
        }
1✔
335
        trace.add("registerSignalHandlers " + signals);
1✔
336
      }
337
      next.registerSignalHandlers(input);
1✔
338
    }
1✔
339

340
    @Override
341
    public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
342
      if (!WorkflowUnsafe.isReplaying()) {
1✔
343
        StringBuilder updates = new StringBuilder();
1✔
344
        for (UpdateRegistrationRequest request : input.getRequests()) {
1✔
345
          if (updates.length() > 0) {
1✔
346
            updates.append(", ");
1✔
347
          }
348
          updates.append(request.getUpdateName());
1✔
349
        }
1✔
350
        trace.add("registerUpdateHandlers " + updates);
1✔
351
      }
352
      next.registerUpdateHandlers(input);
1✔
353
    }
1✔
354

355
    @Override
356
    public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
357
      if (!WorkflowUnsafe.isReplaying()) {
1✔
358
        trace.add("registerDynamicUpdateHandler");
1✔
359
      }
360
      next.registerDynamicUpdateHandler(input);
1✔
361
    }
1✔
362

363
    @Override
364
    public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
365
      if (!WorkflowUnsafe.isReplaying()) {
1✔
366
        trace.add("registerDynamicSignalHandler");
1✔
367
      }
368
      next.registerDynamicSignalHandler(input);
1✔
369
    }
1✔
370

371
    @Override
372
    public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
373
      if (!WorkflowUnsafe.isReplaying()) {
1✔
374
        trace.add("registerDynamicQueryHandler");
1✔
375
      }
376
      next.registerDynamicQueryHandler(input);
1✔
377
    }
1✔
378

379
    @Override
380
    public UUID randomUUID() {
381
      if (!WorkflowUnsafe.isReplaying()) {
×
382
        trace.add("randomUUID");
×
383
      }
384
      return next.randomUUID();
×
385
    }
386

387
    @Override
388
    public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
389
      if (!WorkflowUnsafe.isReplaying()) {
1✔
390
        trace.add("upsertSearchAttributes");
1✔
391
      }
392
      next.upsertSearchAttributes(searchAttributes);
1✔
393
    }
1✔
394

395
    @Override
396
    public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
397
      if (!WorkflowUnsafe.isReplaying()) {
1✔
398
        trace.add("upsertTypedSearchAttributes");
1✔
399
      }
400
      next.upsertTypedSearchAttributes(searchAttributeUpdates);
1✔
401
    }
1✔
402

403
    @Override
404
    public void upsertMemo(Map<String, Object> memo) {
405
      if (!WorkflowUnsafe.isReplaying()) {
×
406
        trace.add("upsertMemo");
×
407
      }
408
      next.upsertMemo(memo);
×
409
    }
×
410

411
    @Override
412
    public Scope getMetricsScope() {
413
      if (!WorkflowUnsafe.isReplaying()) {
×
414
        trace.add("getMetricsScope");
×
415
      }
416
      return next.getMetricsScope();
×
417
    }
418

419
    @Override
420
    public Object newChildThread(Runnable runnable, boolean detached, String name) {
421
      if (!WorkflowUnsafe.isReplaying()) {
1✔
422
        trace.add("newThread " + name);
1✔
423
      }
424
      return next.newChildThread(runnable, detached, name);
1✔
425
    }
426

427
    @Override
428
    public long currentTimeMillis() {
429
      if (!WorkflowUnsafe.isReplaying()) {
1✔
430
        trace.add("currentTimeMillis");
1✔
431
      }
432
      return next.currentTimeMillis();
1✔
433
    }
434
  }
435

436
  private static class TracingActivityInboundCallsInterceptor
437
      implements ActivityInboundCallsInterceptor {
438

439
    private final FilteredTrace trace;
440
    private final ActivityInboundCallsInterceptor next;
441
    private String type;
442
    private boolean local;
443

444
    public TracingActivityInboundCallsInterceptor(
445
        FilteredTrace trace, ActivityInboundCallsInterceptor next) {
1✔
446
      this.trace = trace;
1✔
447
      this.next = next;
1✔
448
    }
1✔
449

450
    @Override
451
    public void init(ActivityExecutionContext context) {
452
      this.type = context.getInfo().getActivityType();
1✔
453
      this.local = context.getInfo().isLocal();
1✔
454
      next.init(
1✔
455
          new ActivityExecutionContextBase(context) {
1✔
456
            @Override
457
            public <V> void heartbeat(V details) throws ActivityCompletionException {
458
              trace.add("heartbeat " + details);
1✔
459
              super.heartbeat(details);
1✔
460
            }
1✔
461
          });
462
    }
1✔
463

464
    @Override
465
    public ActivityOutput execute(ActivityInput input) {
466
      trace.add((local ? "local " : "") + "activity " + type);
1✔
467
      return next.execute(input);
1✔
468
    }
469
  }
470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc