• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2583

31 Oct 2024 09:00PM UTC coverage: 74.8% (+4.2%) from 70.648%
2583

push

buildkite

web-flow
Fix ActivityCompletionClient cancellation and failure by WorkflowExecution (#930)

When sending RespondActivityTaskFailedByIDRequest or RespondActivityTaskCanceledByIDRequest we don't include the ActivityID. Correctly include the id and add test coverage.

2 of 2 new or added lines in 1 file covered. (100.0%)

305 existing lines in 9 files now uncovered.

14500 of 19385 relevant lines covered (74.8%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.18
/src/main/java/com/uber/cadence/internal/tracing/TracingPropagator.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.tracing;
19

20
import com.uber.cadence.Header;
21
import com.uber.cadence.PollForActivityTaskResponse;
22
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
23
import com.uber.cadence.internal.replay.DecisionContext;
24
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
25
import com.uber.cadence.internal.worker.LocalActivityWorker.Task;
26
import io.opentracing.References;
27
import io.opentracing.Span;
28
import io.opentracing.SpanContext;
29
import io.opentracing.Tracer;
30
import io.opentracing.noop.NoopSpan;
31
import io.opentracing.propagation.*;
32
import io.opentracing.propagation.Format;
33
import java.nio.ByteBuffer;
34
import java.util.HashMap;
35
import java.util.Map;
36
import java.util.stream.Collectors;
37

38
public class TracingPropagator {
39
  // span names
40
  private static final String EXECUTE_WORKFLOW = "cadence-ExecuteWorkflow";
41
  private static final String EXECUTE_ACTIVITY = "cadence-ExecuteActivity";
42
  private static final String EXECUTE_LOCAL_ACTIVITY = "cadence-ExecuteLocalActivity";
43

44
  // span tags
45
  private static final String TAG_WORKFLOW_ID = "cadenceWorkflowID";
46
  private static final String TAG_WORKFLOW_TYPE = "cadenceWorkflowType";
47
  private static final String TAG_WORKFLOW_RUN_ID = "cadenceRunID";
48
  private static final String TAG_ACTIVITY_TYPE = "cadenceActivityType";
49

50
  private final Tracer tracer;
51

52
  public TracingPropagator(Tracer tracer) {
1✔
53
    this.tracer = tracer;
1✔
54
  }
1✔
55

56
  public Span spanByServiceMethod(String serviceMethod) {
57
    return tracer.buildSpan(serviceMethod).start();
1✔
58
  }
59

60
  public Span spanForExecuteWorkflow(DecisionContext context) {
61
    WorkflowExecutionStartedEventAttributes attributes =
1✔
62
        context.getWorkflowExecutionStartedEventAttributes();
1✔
63
    SpanContext parent = extract(attributes.getHeader());
1✔
64

65
    return tracer
1✔
66
        .buildSpan(EXECUTE_WORKFLOW)
1✔
67
        .ignoreActiveSpan() // ignore active span to start a new trace that ONLY links the start
1✔
68
        // workflow context
69
        .addReference(
1✔
70
            References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
1✔
71
        .withTag(TAG_WORKFLOW_TYPE, context.getWorkflowType().getName())
1✔
72
        .withTag(TAG_WORKFLOW_ID, context.getWorkflowId())
1✔
73
        .withTag(TAG_WORKFLOW_RUN_ID, context.getRunId())
1✔
74
        .start();
1✔
75
  }
76

77
  public Span spanForExecuteActivity(PollForActivityTaskResponse task) {
78
    SpanContext parent = extract(task.getHeader());
1✔
79
    return tracer
1✔
80
        .buildSpan(EXECUTE_ACTIVITY)
1✔
81
        .ignoreActiveSpan() // ignore active span to start a new trace that ONLY links the execute
1✔
82
        // workflow context
83
        .addReference(
1✔
84
            References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
1✔
85
        .withTag(
1✔
86
            TAG_WORKFLOW_TYPE, task.isSetWorkflowType() ? task.getWorkflowType().getName() : "null")
1✔
87
        .withTag(
1✔
88
            TAG_WORKFLOW_ID,
89
            task.isSetWorkflowExecution() ? task.getWorkflowExecution().getWorkflowId() : "null")
1✔
90
        .withTag(
1✔
91
            TAG_WORKFLOW_RUN_ID,
92
            task.isSetWorkflowExecution() ? task.getWorkflowExecution().getRunId() : "null")
1✔
93
        .withTag(
1✔
94
            TAG_ACTIVITY_TYPE, task.isSetActivityType() ? task.getActivityType().getName() : "null")
1✔
95
        .start();
1✔
96
  }
97

98
  public Span spanForExecuteLocalActivity(Task task) {
99
    ExecuteLocalActivityParameters params = task.getExecuteLocalActivityParameters();
1✔
100

101
    // retrieve spancontext from params
102
    SpanContext parent = extract(params.getContext());
1✔
103

104
    return tracer
1✔
105
        .buildSpan(EXECUTE_LOCAL_ACTIVITY)
1✔
106
        .ignoreActiveSpan()
1✔
107
        .addReference(References.FOLLOWS_FROM, parent)
1✔
108
        .withTag(TAG_WORKFLOW_ID, params.getWorkflowExecution().getWorkflowId())
1✔
109
        .withTag(TAG_WORKFLOW_RUN_ID, params.getWorkflowExecution().getRunId())
1✔
110
        .withTag(TAG_ACTIVITY_TYPE, params.getActivityType().getName())
1✔
111
        .start();
1✔
112
  }
113

114
  public void inject(Map<String, byte[]> headers) {
115
    Map<String, String> context = getCurrentContext();
1✔
116
    context.forEach(
1✔
117
        (k, v) -> {
UNCOV
118
          headers.put(k, v.getBytes());
×
UNCOV
119
        });
×
120
  }
1✔
121

122
  public void inject(Header header) {
123
    Map<String, String> context = getCurrentContext();
1✔
124
    context.forEach(
1✔
125
        (k, v) -> {
126
          header.putToFields(k, ByteBuffer.wrap(v.getBytes()));
1✔
127
        });
1✔
128
  }
1✔
129

130
  private Map<String, String> getCurrentContext() {
131
    Map<String, String> context = new HashMap<>();
1✔
132
    if (tracer.activeSpan() != null) {
1✔
133
      tracer.inject(
1✔
134
          tracer.activeSpan().context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(context));
1✔
135
    }
136
    return context;
1✔
137
  }
138

139
  private SpanContext extract(Map<String, byte[]> headers) {
140
    if (headers == null) return NoopSpan.INSTANCE.context();
1✔
141
    return tracer.extract(
1✔
142
        Format.Builtin.TEXT_MAP,
143
        new TextMapAdapter(
144
            headers
145
                .entrySet()
1✔
146
                .stream()
1✔
147
                .collect(Collectors.toMap(Map.Entry::getKey, e -> new String(e.getValue())))));
1✔
148
  }
149

150
  private SpanContext extract(Header header) {
151
    if (header == null || header.getFields() == null) return NoopSpan.INSTANCE.context();
1✔
152
    return tracer.extract(
1✔
153
        Format.Builtin.TEXT_MAP,
154
        new TextMapAdapter(
155
            header
156
                .getFields()
1✔
157
                .entrySet()
1✔
158
                .stream()
1✔
159
                .collect(
1✔
160
                    Collectors.toMap(
1✔
161
                        Map.Entry::getKey,
162
                        e -> {
163
                          byte[] bytes = new byte[e.getValue().remaining()];
1✔
164
                          e.getValue().duplicate().get(bytes);
1✔
165
                          return new String(bytes);
1✔
166
                        }))));
167
  }
168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc