• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19511

16 Oct 2024 11:12PM UTC coverage: 84.668% (+0.01%) from 84.654%
#19511

push

github

web-flow
Get  mesh_id local label from "CSM_MESH_ID" environment variable, rather than parsing from bootstrap file (#11621)

33807 of 39929 relevant lines covered (84.67%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.38
/../gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/MetadataExchanger.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.gcp.csm.observability;
18

19
import com.google.common.base.Preconditions;
20
import com.google.common.io.BaseEncoding;
21
import com.google.protobuf.Struct;
22
import com.google.protobuf.Value;
23
import io.grpc.CallOptions;
24
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
25
import io.grpc.Metadata;
26
import io.grpc.ServerBuilder;
27
import io.grpc.ServerCall;
28
import io.grpc.ServerCallHandler;
29
import io.grpc.ServerInterceptor;
30
import io.grpc.Status;
31
import io.grpc.opentelemetry.InternalOpenTelemetryPlugin;
32
import io.grpc.protobuf.ProtoUtils;
33
import io.grpc.xds.ClusterImplLoadBalancerProvider;
34
import io.opentelemetry.api.common.AttributeKey;
35
import io.opentelemetry.api.common.Attributes;
36
import io.opentelemetry.api.common.AttributesBuilder;
37
import io.opentelemetry.contrib.gcp.resource.GCPResourceProvider;
38
import io.opentelemetry.sdk.autoconfigure.ResourceConfiguration;
39
import java.net.URI;
40
import java.util.Map;
41
import java.util.function.Consumer;
42

43
/**
44
 * OpenTelemetryPlugin implementing metadata-based workload property exchange for both client and
45
 * server. Is responsible for determining the metadata, communicating the metadata, and adding local
46
 * and remote details to metrics.
47
 */
48
final class MetadataExchanger implements InternalOpenTelemetryPlugin {
49

50
  private static final AttributeKey<String> CLOUD_PLATFORM =
1✔
51
      AttributeKey.stringKey("cloud.platform");
1✔
52
  private static final AttributeKey<String> K8S_NAMESPACE_NAME =
1✔
53
      AttributeKey.stringKey("k8s.namespace.name");
1✔
54
  private static final AttributeKey<String> K8S_CLUSTER_NAME =
1✔
55
      AttributeKey.stringKey("k8s.cluster.name");
1✔
56
  private static final AttributeKey<String> CLOUD_AVAILABILITY_ZONE =
1✔
57
      AttributeKey.stringKey("cloud.availability_zone");
1✔
58
  private static final AttributeKey<String> CLOUD_REGION =
1✔
59
      AttributeKey.stringKey("cloud.region");
1✔
60
  private static final AttributeKey<String> CLOUD_ACCOUNT_ID =
1✔
61
      AttributeKey.stringKey("cloud.account.id");
1✔
62

63
  private static final Metadata.Key<String> SEND_KEY =
1✔
64
      Metadata.Key.of("x-envoy-peer-metadata", Metadata.ASCII_STRING_MARSHALLER);
1✔
65
  private static final Metadata.Key<Struct> RECV_KEY =
1✔
66
      Metadata.Key.of("x-envoy-peer-metadata", new BinaryToAsciiMarshaller<>(
1✔
67
          ProtoUtils.metadataMarshaller(Struct.getDefaultInstance())));
1✔
68

69
  private static final String EXCHANGE_TYPE = "type";
70
  private static final String EXCHANGE_CANONICAL_SERVICE = "canonical_service";
71
  private static final String EXCHANGE_PROJECT_ID = "project_id";
72
  private static final String EXCHANGE_LOCATION = "location";
73
  private static final String EXCHANGE_CLUSTER_NAME = "cluster_name";
74
  private static final String EXCHANGE_NAMESPACE_NAME = "namespace_name";
75
  private static final String EXCHANGE_WORKLOAD_NAME = "workload_name";
76
  private static final String TYPE_GKE = "gcp_kubernetes_engine";
77
  private static final String TYPE_GCE = "gcp_compute_engine";
78

79
  private final String localMetadata;
80
  private final Attributes localAttributes;
81

82
  public MetadataExchanger() {
83
    this(
×
84
        addOtelResourceAttributes(new GCPResourceProvider().getAttributes()),
×
85
        System::getenv);
86
  }
×
87

88
  MetadataExchanger(Attributes platformAttributes, Lookup env) {
1✔
89
    String type = platformAttributes.get(CLOUD_PLATFORM);
1✔
90
    String canonicalService = env.get("CSM_CANONICAL_SERVICE_NAME");
1✔
91
    Struct.Builder struct = Struct.newBuilder();
1✔
92
    put(struct, EXCHANGE_TYPE, type);
1✔
93
    put(struct, EXCHANGE_CANONICAL_SERVICE, canonicalService);
1✔
94
    if (TYPE_GKE.equals(type)) {
1✔
95
      String location = platformAttributes.get(CLOUD_AVAILABILITY_ZONE);
1✔
96
      if (location == null) {
1✔
97
        location = platformAttributes.get(CLOUD_REGION);
1✔
98
      }
99
      put(struct, EXCHANGE_WORKLOAD_NAME,  env.get("CSM_WORKLOAD_NAME"));
1✔
100
      put(struct, EXCHANGE_NAMESPACE_NAME, platformAttributes.get(K8S_NAMESPACE_NAME));
1✔
101
      put(struct, EXCHANGE_CLUSTER_NAME,   platformAttributes.get(K8S_CLUSTER_NAME));
1✔
102
      put(struct, EXCHANGE_LOCATION,       location);
1✔
103
      put(struct, EXCHANGE_PROJECT_ID,     platformAttributes.get(CLOUD_ACCOUNT_ID));
1✔
104
    } else if (TYPE_GCE.equals(type)) {
1✔
105
      String location = platformAttributes.get(CLOUD_AVAILABILITY_ZONE);
1✔
106
      if (location == null) {
1✔
107
        location = platformAttributes.get(CLOUD_REGION);
1✔
108
      }
109
      put(struct, EXCHANGE_WORKLOAD_NAME, env.get("CSM_WORKLOAD_NAME"));
1✔
110
      put(struct, EXCHANGE_LOCATION,      location);
1✔
111
      put(struct, EXCHANGE_PROJECT_ID,    platformAttributes.get(CLOUD_ACCOUNT_ID));
1✔
112
    }
113
    localMetadata = BaseEncoding.base64().encode(struct.build().toByteArray());
1✔
114

115
    localAttributes = Attributes.builder()
1✔
116
        .put("csm.mesh_id", nullIsUnknown(env.get("CSM_MESH_ID")))
1✔
117
        .put("csm.workload_canonical_service", nullIsUnknown(canonicalService))
1✔
118
        .build();
1✔
119
  }
1✔
120

121
  private static String nullIsUnknown(String value) {
122
    return value == null ? "unknown" : value;
1✔
123
  }
124

125
  private static void put(Struct.Builder struct, String key, String value) {
126
    value = nullIsUnknown(value);
1✔
127
    struct.putFields(key, Value.newBuilder().setStringValue(value).build());
1✔
128
  }
1✔
129

130
  private static void put(AttributesBuilder attributes, String key, Value value) {
131
    attributes.put(key, nullIsUnknown(fromValue(value)));
1✔
132
  }
1✔
133

134
  private static String fromValue(Value value) {
135
    if (value == null) {
1✔
136
      return null;
1✔
137
    }
138
    if (value.getKindCase() != Value.KindCase.STRING_VALUE) {
1✔
139
      return null;
1✔
140
    }
141
    return value.getStringValue();
1✔
142
  }
143

144
  private static Attributes addOtelResourceAttributes(Attributes platformAttributes) {
145
    // Can't inject env variables as ResourceConfiguration requires the large ConfigProperties API
146
    // to inject our own values and a default implementation isn't provided. So this reads directly
147
    // from System.getenv().
148
    Attributes envAttributes = ResourceConfiguration
149
        .createEnvironmentResource()
×
150
        .getAttributes();
×
151

152
    AttributesBuilder builder = platformAttributes.toBuilder();
×
153
    builder.putAll(envAttributes);
×
154
    return builder.build();
×
155
  }
156

157
  private void addLabels(AttributesBuilder to, Struct struct) {
158
    to.putAll(localAttributes);
1✔
159
    Map<String, Value> remote = struct.getFieldsMap();
1✔
160
    Value typeValue = remote.get(EXCHANGE_TYPE);
1✔
161
    String type = fromValue(typeValue);
1✔
162
    put(to, "csm.remote_workload_type", typeValue);
1✔
163
    put(to, "csm.remote_workload_canonical_service", remote.get(EXCHANGE_CANONICAL_SERVICE));
1✔
164
    if (TYPE_GKE.equals(type)) {
1✔
165
      put(to, "csm.remote_workload_project_id",     remote.get(EXCHANGE_PROJECT_ID));
1✔
166
      put(to, "csm.remote_workload_location",       remote.get(EXCHANGE_LOCATION));
1✔
167
      put(to, "csm.remote_workload_cluster_name",   remote.get(EXCHANGE_CLUSTER_NAME));
1✔
168
      put(to, "csm.remote_workload_namespace_name", remote.get(EXCHANGE_NAMESPACE_NAME));
1✔
169
      put(to, "csm.remote_workload_name",           remote.get(EXCHANGE_WORKLOAD_NAME));
1✔
170
    } else if (TYPE_GCE.equals(type)) {
1✔
171
      put(to, "csm.remote_workload_project_id",     remote.get(EXCHANGE_PROJECT_ID));
1✔
172
      put(to, "csm.remote_workload_location",       remote.get(EXCHANGE_LOCATION));
1✔
173
      put(to, "csm.remote_workload_name",           remote.get(EXCHANGE_WORKLOAD_NAME));
1✔
174
    }
175
  }
1✔
176

177
  @Override
178
  public boolean enablePluginForChannel(String target) {
179
    URI uri;
180
    try {
181
      uri = new URI(target);
1✔
182
    } catch (Exception ex) {
1✔
183
      return false;
1✔
184
    }
1✔
185
    String authority = uri.getAuthority();
1✔
186
    return "xds".equals(uri.getScheme())
1✔
187
        && (authority == null || "traffic-director-global.xds.googleapis.com".equals(authority));
1✔
188
  }
189

190
  @Override
191
  public ClientCallPlugin newClientCallPlugin() {
192
    return new ClientCallState();
1✔
193
  }
194

195
  public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
196
    serverBuilder.intercept(new ServerCallInterceptor());
1✔
197
  }
1✔
198

199
  @Override
200
  public ServerStreamPlugin newServerStreamPlugin(Metadata inboundMetadata) {
201
    return new ServerStreamState(inboundMetadata.get(RECV_KEY));
1✔
202
  }
203

204
  final class ClientCallState implements ClientCallPlugin {
1✔
205
    private volatile Value serviceName;
206
    private volatile Value serviceNamespace;
207

208
    @Override
209
    public ClientStreamPlugin newClientStreamPlugin() {
210
      return new ClientStreamState();
1✔
211
    }
212

213
    @Override
214
    public CallOptions filterCallOptions(CallOptions options) {
215
      Consumer<Map<String, Struct>> existingConsumer =
1✔
216
          options.getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER);
1✔
217
      return options.withOption(
1✔
218
          ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER,
219
          (Map<String, Struct> clusterMetadata) -> {
220
            metadataConsumer(clusterMetadata);
1✔
221
            existingConsumer.accept(clusterMetadata);
1✔
222
          });
1✔
223
    }
224

225
    private void metadataConsumer(Map<String, Struct> clusterMetadata) {
226
      Struct struct = clusterMetadata.get("com.google.csm.telemetry_labels");
1✔
227
      if (struct == null) {
1✔
228
        struct = Struct.getDefaultInstance();
1✔
229
      }
230
      serviceName = struct.getFieldsMap().get("service_name");
1✔
231
      serviceNamespace = struct.getFieldsMap().get("service_namespace");
1✔
232
    }
1✔
233

234
    @Override
235
    public void addMetadata(Metadata toMetadata) {
236
      toMetadata.put(SEND_KEY, localMetadata);
1✔
237
    }
1✔
238

239
    class ClientStreamState implements ClientStreamPlugin {
1✔
240
      private Struct receivedExchange;
241

242
      @Override
243
      public void inboundHeaders(Metadata headers) {
244
        setExchange(headers);
1✔
245
      }
1✔
246

247
      @Override
248
      public void inboundTrailers(Metadata trailers) {
249
        if (receivedExchange != null) {
1✔
250
          return; // Received headers
1✔
251
        }
252
        setExchange(trailers);
1✔
253
      }
1✔
254

255
      private void setExchange(Metadata metadata) {
256
        Struct received = metadata.get(RECV_KEY);
1✔
257
        if (received == null) {
1✔
258
          receivedExchange = Struct.getDefaultInstance();
1✔
259
        } else {
260
          receivedExchange = received;
1✔
261
        }
262
      }
1✔
263

264
      @Override
265
      public void addLabels(AttributesBuilder to) {
266
        put(to, "csm.service_name",           serviceName);
1✔
267
        put(to, "csm.service_namespace_name", serviceNamespace);
1✔
268
        Struct exchange = receivedExchange;
1✔
269
        if (exchange == null) {
1✔
270
          exchange = Struct.getDefaultInstance();
×
271
        }
272
        MetadataExchanger.this.addLabels(to, exchange);
1✔
273
      }
1✔
274
    }
275
  }
276

277
  final class ServerCallInterceptor implements ServerInterceptor {
1✔
278
    @Override
279
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
280
        ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
281
      if (!headers.containsKey(RECV_KEY)) {
1✔
282
        return next.startCall(call, headers);
1✔
283
      } else {
284
        return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
1✔
285
          private boolean headersSent;
286

287
          @Override
288
          public void sendHeaders(Metadata headers) {
289
            headersSent = true;
1✔
290
            headers.put(SEND_KEY, localMetadata);
1✔
291
            super.sendHeaders(headers);
1✔
292
          }
1✔
293

294
          @Override
295
          public void close(Status status, Metadata trailers) {
296
            if (!headersSent) {
1✔
297
              trailers.put(SEND_KEY, localMetadata);
1✔
298
            }
299
            super.close(status, trailers);
1✔
300
          }
1✔
301
        }, headers);
302
      }
303
    }
304
  }
305

306
  final class ServerStreamState implements ServerStreamPlugin {
307
    private final Struct receivedExchange;
308

309
    ServerStreamState(Struct exchange) {
1✔
310
      if (exchange == null) {
1✔
311
        exchange = Struct.getDefaultInstance();
1✔
312
      }
313
      receivedExchange = exchange;
1✔
314
    }
1✔
315

316
    @Override
317
    public void addLabels(AttributesBuilder to) {
318
      MetadataExchanger.this.addLabels(to, receivedExchange);
1✔
319
    }
1✔
320
  }
321

322
  interface Lookup {
323
    String get(String name);
324
  }
325

326
  interface Supplier<T> {
327
    T get() throws Exception;
328
  }
329

330
  static final class BinaryToAsciiMarshaller<T> implements Metadata.AsciiMarshaller<T> {
331
    private final Metadata.BinaryMarshaller<T> delegate;
332

333
    public BinaryToAsciiMarshaller(Metadata.BinaryMarshaller<T> delegate) {
1✔
334
      this.delegate = Preconditions.checkNotNull(delegate, "delegate");
1✔
335
    }
1✔
336

337
    @Override
338
    public T parseAsciiString(String serialized) {
339
      return delegate.parseBytes(BaseEncoding.base64().decode(serialized));
1✔
340
    }
341

342
    @Override
343
    public String toAsciiString(T value) {
344
      return BaseEncoding.base64().encode(delegate.toBytes(value));
×
345
    }
346
  }
347
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc