• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #175

pending completion
#175

push

github-actions

web-flow
Worker / Build Id versioning (#1786)

Implement new worker build id based versioning feature

236 of 236 new or added lines in 24 files covered. (100.0%)

18343 of 23697 relevant lines covered (77.41%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.04
/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing.internal;
22

23
import static org.junit.Assert.assertNotNull;
24
import static org.junit.Assert.assertTrue;
25

26
import io.temporal.activity.ActivityExecutionContext;
27
import io.temporal.client.ActivityCompletionException;
28
import io.temporal.common.SearchAttributeUpdate;
29
import io.temporal.common.interceptors.*;
30
import io.temporal.internal.sync.WorkflowMethodThreadNameStrategy;
31
import io.temporal.workflow.Functions;
32
import io.temporal.workflow.Promise;
33
import io.temporal.workflow.Workflow;
34
import io.temporal.workflow.WorkflowInfo;
35
import io.temporal.workflow.unsafe.WorkflowUnsafe;
36
import java.lang.reflect.Type;
37
import java.time.Duration;
38
import java.util.*;
39
import java.util.function.BiPredicate;
40
import java.util.function.Supplier;
41
import javax.annotation.Nonnull;
42
import org.slf4j.Logger;
43
import org.slf4j.LoggerFactory;
44

45
public class TracingWorkerInterceptor implements WorkerInterceptor {
46

47
  private static final Logger log = LoggerFactory.getLogger(TracingWorkerInterceptor.class);
1✔
48

49
  private final FilteredTrace trace;
50
  private List<String> expected;
51

52
  public TracingWorkerInterceptor(FilteredTrace trace) {
1✔
53
    this.trace = trace;
1✔
54
  }
1✔
55

56
  public String getTrace() {
57
    return String.join("\n", trace.getImpl());
1✔
58
  }
59

60
  public void setExpected(String... expected) {
61
    this.expected = Arrays.asList(expected);
1✔
62
  }
1✔
63

64
  public void assertExpected() {
65
    // As it stands, when the trace is empty but the expected list isn't this still passes.
66
    if (expected != null) {
×
67
      List<String> traceElements = trace.getImpl();
×
68
      for (int i = 0; i < traceElements.size(); i++) {
×
69
        String t = traceElements.get(i);
×
70
        String expectedRegExp;
71
        if (expected.size() <= i) {
×
72
          expectedRegExp = "";
×
73
        } else {
74
          expectedRegExp = expected.get(i);
×
75
        }
76
        assertTrue(
×
77
            t
78
                + " doesn't match "
79
                + expectedRegExp
80
                + ": \n expected=\n"
81
                + String.join("\n", expected)
×
82
                + "\n actual=\n"
83
                + String.join("\n", traceElements)
×
84
                + "\n",
85
            t.matches(expectedRegExp));
×
86
      }
87
    }
88
  }
×
89

90
  @Override
91
  public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
92
    if (!WorkflowUnsafe.isReplaying()) {
1✔
93
      trace.add("interceptExecuteWorkflow " + Workflow.getInfo().getWorkflowId());
1✔
94
    }
95
    return new WorkflowInboundCallsInterceptorBase(next) {
1✔
96
      @Override
97
      public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
98
        next.init(new TracingWorkflowOutboundCallsInterceptor(trace, outboundCalls));
1✔
99
      }
1✔
100

101
      @Override
102
      public void handleSignal(SignalInput input) {
103
        trace.add("handleSignal " + input.getSignalName());
1✔
104
        super.handleSignal(input);
1✔
105
      }
1✔
106

107
      @Override
108
      public QueryOutput handleQuery(QueryInput input) {
109
        trace.add("handleQuery " + input.getQueryName());
1✔
110
        return super.handleQuery(input);
1✔
111
      }
112

113
      @Nonnull
114
      @Override
115
      public Object newWorkflowMethodThread(Runnable runnable, String name) {
116
        if (!WorkflowUnsafe.isReplaying()) {
1✔
117
          if (name.startsWith(WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX)) {
1✔
118
            // strip the IDs we add to identify WF thread method
119
            trace.add("newThread " + WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX);
1✔
120
          } else {
121
            trace.add("newThread " + name);
×
122
          }
123
        }
124
        return next.newWorkflowMethodThread(runnable, name);
1✔
125
      }
126
    };
127
  }
128

129
  @Override
130
  public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) {
131
    return new TracingActivityInboundCallsInterceptor(trace, next);
1✔
132
  }
133

134
  public static class FilteredTrace {
1✔
135

136
    private final List<String> impl = Collections.synchronizedList(new ArrayList<>());
1✔
137

138
    public boolean add(String s) {
139
      log.trace("FilteredTrace isReplaying=" + WorkflowUnsafe.isReplaying());
1✔
140
      if (!WorkflowUnsafe.isReplaying()) {
1✔
141
        return impl.add(s);
1✔
142
      }
143
      return true;
1✔
144
    }
145

146
    List<String> getImpl() {
147
      return impl;
1✔
148
    }
149
  }
150

151
  private static class TracingWorkflowOutboundCallsInterceptor
152
      implements WorkflowOutboundCallsInterceptor {
153

154
    private final FilteredTrace trace;
155
    private final WorkflowOutboundCallsInterceptor next;
156

157
    private TracingWorkflowOutboundCallsInterceptor(
158
        FilteredTrace trace, WorkflowOutboundCallsInterceptor next) {
1✔
159
      WorkflowInfo workflowInfo =
160
          Workflow.getInfo(); // checks that info is available in the constructor
1✔
161
      assertNotNull(workflowInfo);
1✔
162
      this.trace = trace;
1✔
163
      this.next = Objects.requireNonNull(next);
1✔
164
    }
1✔
165

166
    @Override
167
    public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
168
      if (!WorkflowUnsafe.isReplaying()) {
1✔
169
        trace.add("executeActivity " + input.getActivityName());
1✔
170
      }
171
      return next.executeActivity(input);
1✔
172
    }
173

174
    @Override
175
    public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
176
      if (!WorkflowUnsafe.isReplaying()) {
1✔
177
        trace.add("executeLocalActivity " + input.getActivityName());
1✔
178
      }
179
      return next.executeLocalActivity(input);
1✔
180
    }
181

182
    @Override
183
    public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
184
      if (!WorkflowUnsafe.isReplaying()) {
1✔
185
        trace.add("executeChildWorkflow " + input.getWorkflowType());
1✔
186
      }
187
      return next.executeChildWorkflow(input);
1✔
188
    }
189

190
    @Override
191
    public Random newRandom() {
192
      if (!WorkflowUnsafe.isReplaying()) {
×
193
        trace.add("newRandom");
×
194
      }
195
      return next.newRandom();
×
196
    }
197

198
    @Override
199
    public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
200
      if (!WorkflowUnsafe.isReplaying()) {
1✔
201
        trace.add(
1✔
202
            "signalExternalWorkflow "
203
                + input.getExecution().getWorkflowId()
1✔
204
                + " "
205
                + input.getSignalName());
1✔
206
      }
207
      return next.signalExternalWorkflow(input);
1✔
208
    }
209

210
    @Override
211
    public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
212
      if (!WorkflowUnsafe.isReplaying()) {
×
213
        trace.add("cancelWorkflow " + input.getExecution().getWorkflowId());
×
214
      }
215
      return next.cancelWorkflow(input);
×
216
    }
217

218
    @Override
219
    public void sleep(Duration duration) {
220
      if (!WorkflowUnsafe.isReplaying()) {
1✔
221
        trace.add("sleep " + duration);
1✔
222
      }
223
      next.sleep(duration);
1✔
224
    }
1✔
225

226
    @Override
227
    public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
228
      if (!WorkflowUnsafe.isReplaying()) {
1✔
229
        trace.add("await " + timeout + " " + reason);
1✔
230
      }
231
      return next.await(timeout, reason, unblockCondition);
1✔
232
    }
233

234
    @Override
235
    public void await(String reason, Supplier<Boolean> unblockCondition) {
236
      if (!WorkflowUnsafe.isReplaying()) {
1✔
237
        trace.add("await " + reason);
1✔
238
      }
239
      next.await(reason, unblockCondition);
1✔
240
    }
1✔
241

242
    @Override
243
    public Promise<Void> newTimer(Duration duration) {
244
      if (!WorkflowUnsafe.isReplaying()) {
1✔
245
        trace.add("newTimer " + duration);
1✔
246
      }
247
      return next.newTimer(duration);
1✔
248
    }
249

250
    @Override
251
    public <R> R sideEffect(Class<R> resultClass, Type resultType, Functions.Func<R> func) {
252
      if (!WorkflowUnsafe.isReplaying()) {
1✔
253
        trace.add("sideEffect");
1✔
254
      }
255
      return next.sideEffect(resultClass, resultType, func);
1✔
256
    }
257

258
    @Override
259
    public <R> R mutableSideEffect(
260
        String id,
261
        Class<R> resultClass,
262
        Type resultType,
263
        BiPredicate<R, R> updated,
264
        Functions.Func<R> func) {
265
      if (!WorkflowUnsafe.isReplaying()) {
1✔
266
        trace.add("mutableSideEffect");
1✔
267
      }
268
      return next.mutableSideEffect(id, resultClass, resultType, updated, func);
1✔
269
    }
270

271
    @Override
272
    public int getVersion(String changeId, int minSupported, int maxSupported) {
273
      if (!WorkflowUnsafe.isReplaying()) {
1✔
274
        trace.add("getVersion");
1✔
275
      }
276
      return next.getVersion(changeId, minSupported, maxSupported);
1✔
277
    }
278

279
    @Override
280
    public void continueAsNew(ContinueAsNewInput input) {
281
      if (!WorkflowUnsafe.isReplaying()) {
1✔
282
        trace.add("continueAsNew");
1✔
283
      }
284
      next.continueAsNew(input);
×
285
    }
×
286

287
    @Override
288
    public void registerQuery(RegisterQueryInput input) {
289
      String queryType = input.getQueryType();
1✔
290
      if (!WorkflowUnsafe.isReplaying()) {
1✔
291
        trace.add("registerQuery " + queryType);
1✔
292
      }
293
      next.registerQuery(
1✔
294
          new RegisterQueryInput(
295
              queryType,
296
              input.getArgTypes(),
1✔
297
              input.getGenericArgTypes(),
1✔
298
              (args) -> {
299
                Object result = input.getCallback().apply(args);
1✔
300
                if (!WorkflowUnsafe.isReplaying()) {
1✔
301
                  if (queryType.equals("query")) {
1✔
302
                    log.trace("query", new Throwable());
1✔
303
                  }
304
                  trace.add("query " + queryType);
1✔
305
                }
306
                return result;
1✔
307
              }));
308
    }
1✔
309

310
    @Override
311
    public void registerSignalHandlers(RegisterSignalHandlersInput input) {
312
      if (!WorkflowUnsafe.isReplaying()) {
1✔
313
        StringBuilder signals = new StringBuilder();
1✔
314
        for (SignalRegistrationRequest request : input.getRequests()) {
1✔
315
          if (signals.length() > 0) {
1✔
316
            signals.append(", ");
×
317
          }
318
          signals.append(request.getSignalType());
1✔
319
        }
1✔
320
        trace.add("registerSignalHandlers " + signals);
1✔
321
      }
322
      next.registerSignalHandlers(input);
1✔
323
    }
1✔
324

325
    @Override
326
    public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
327
      if (!WorkflowUnsafe.isReplaying()) {
1✔
328
        StringBuilder updates = new StringBuilder();
1✔
329
        for (UpdateRegistrationRequest request : input.getRequests()) {
1✔
330
          if (updates.length() > 0) {
1✔
331
            updates.append(", ");
1✔
332
          }
333
          updates.append(request.getUpdateName());
1✔
334
        }
1✔
335
        trace.add("registerUpdateHandlers " + updates);
1✔
336
      }
337
      next.registerUpdateHandlers(input);
1✔
338
    }
1✔
339

340
    @Override
341
    public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
342
      if (!WorkflowUnsafe.isReplaying()) {
×
343
        trace.add("registerDynamicUpdateHandler");
×
344
      }
345
      this.registerDynamicUpdateHandler(input);
×
346
    }
×
347

348
    @Override
349
    public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
350
      if (!WorkflowUnsafe.isReplaying()) {
1✔
351
        trace.add("registerDynamicSignalHandler");
1✔
352
      }
353
      next.registerDynamicSignalHandler(input);
1✔
354
    }
1✔
355

356
    @Override
357
    public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
358
      if (!WorkflowUnsafe.isReplaying()) {
1✔
359
        trace.add("registerDynamicQueryHandler");
1✔
360
      }
361
      next.registerDynamicQueryHandler(input);
1✔
362
    }
1✔
363

364
    @Override
365
    public UUID randomUUID() {
366
      if (!WorkflowUnsafe.isReplaying()) {
×
367
        trace.add("randomUUID");
×
368
      }
369
      return next.randomUUID();
×
370
    }
371

372
    @Override
373
    public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
374
      if (!WorkflowUnsafe.isReplaying()) {
1✔
375
        trace.add("upsertSearchAttributes");
1✔
376
      }
377
      next.upsertSearchAttributes(searchAttributes);
1✔
378
    }
1✔
379

380
    @Override
381
    public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
382
      if (!WorkflowUnsafe.isReplaying()) {
1✔
383
        trace.add("upsertTypedSearchAttributes");
1✔
384
      }
385
      next.upsertTypedSearchAttributes(searchAttributeUpdates);
1✔
386
    }
1✔
387

388
    @Override
389
    public Object newChildThread(Runnable runnable, boolean detached, String name) {
390
      if (!WorkflowUnsafe.isReplaying()) {
1✔
391
        trace.add("newThread " + name);
1✔
392
      }
393
      return next.newChildThread(runnable, detached, name);
1✔
394
    }
395

396
    @Override
397
    public long currentTimeMillis() {
398
      if (!WorkflowUnsafe.isReplaying()) {
1✔
399
        trace.add("currentTimeMillis");
1✔
400
      }
401
      return next.currentTimeMillis();
1✔
402
    }
403
  }
404

405
  private static class TracingActivityInboundCallsInterceptor
406
      implements ActivityInboundCallsInterceptor {
407

408
    private final FilteredTrace trace;
409
    private final ActivityInboundCallsInterceptor next;
410
    private String type;
411
    private boolean local;
412

413
    public TracingActivityInboundCallsInterceptor(
414
        FilteredTrace trace, ActivityInboundCallsInterceptor next) {
1✔
415
      this.trace = trace;
1✔
416
      this.next = next;
1✔
417
    }
1✔
418

419
    @Override
420
    public void init(ActivityExecutionContext context) {
421
      this.type = context.getInfo().getActivityType();
1✔
422
      this.local = context.getInfo().isLocal();
1✔
423
      next.init(
1✔
424
          new ActivityExecutionContextBase(context) {
1✔
425
            @Override
426
            public <V> void heartbeat(V details) throws ActivityCompletionException {
427
              trace.add("heartbeat " + details);
1✔
428
              super.heartbeat(details);
1✔
429
            }
1✔
430
          });
431
    }
1✔
432

433
    @Override
434
    public ActivityOutput execute(ActivityInput input) {
435
      trace.add((local ? "local " : "") + "activity " + type);
1✔
436
      return next.execute(input);
1✔
437
    }
438
  }
439
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc