• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19174

25 Apr 2024 10:35PM UTC coverage: 88.091% (+0.02%) from 88.069%
#19174

push

github

ejona86
rls: Document RefCountedChildPolicyWrapperFactory as non-threadsafe

Instead of having docs in RefCountedChildPolicyWrapperFactory saying
that every method was guarded by a lock, I added `@GuardedBy("lock")`
within CachingRlsLbClient, so now it is clearly not thread-safe and the
lock protects access. The AtomicLong was replaced with a long since
1) there was no multi-threading and 2) the logic was not atomic-safe
which was misleading.

31216 of 35436 relevant lines covered (88.09%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.61
/../rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import io.grpc.ChannelLogger.ChannelLogLevel;
26
import io.grpc.ConnectivityState;
27
import io.grpc.LoadBalancer;
28
import io.grpc.LoadBalancer.Helper;
29
import io.grpc.LoadBalancer.Subchannel;
30
import io.grpc.LoadBalancer.SubchannelPicker;
31
import io.grpc.LoadBalancerProvider;
32
import io.grpc.LoadBalancerRegistry;
33
import io.grpc.NameResolver.ConfigOrError;
34
import io.grpc.internal.ObjectPool;
35
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
36
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
37
import io.grpc.util.ForwardingLoadBalancerHelper;
38
import java.util.ArrayList;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.List;
42
import java.util.Map;
43
import java.util.Objects;
44
import javax.annotation.Nullable;
45

46
/** Configuration for RLS load balancing policy. */
47
final class LbPolicyConfiguration {
48

49
  private final RouteLookupConfig routeLookupConfig;
50
  @Nullable
51
  private final Map<String, ?> routeLookupChannelServiceConfig;
52
  private final ChildLoadBalancingPolicy policy;
53

54
  LbPolicyConfiguration(
55
      RouteLookupConfig routeLookupConfig, @Nullable Map<String, ?> routeLookupChannelServiceConfig,
56
      ChildLoadBalancingPolicy policy) {
1✔
57
    this.routeLookupConfig = checkNotNull(routeLookupConfig, "routeLookupConfig");
1✔
58
    this.routeLookupChannelServiceConfig = routeLookupChannelServiceConfig;
1✔
59
    this.policy = checkNotNull(policy, "policy");
1✔
60
  }
1✔
61

62
  RouteLookupConfig getRouteLookupConfig() {
63
    return routeLookupConfig;
1✔
64
  }
65

66
  @Nullable
67
  Map<String, ?> getRouteLookupChannelServiceConfig() {
68
    return routeLookupChannelServiceConfig;
1✔
69
  }
70

71
  ChildLoadBalancingPolicy getLoadBalancingPolicy() {
72
    return policy;
1✔
73
  }
74

75
  @Override
76
  public boolean equals(Object o) {
77
    if (this == o) {
1✔
78
      return true;
×
79
    }
80
    if (o == null || getClass() != o.getClass()) {
1✔
81
      return false;
1✔
82
    }
83
    LbPolicyConfiguration that = (LbPolicyConfiguration) o;
×
84
    return Objects.equals(routeLookupConfig, that.routeLookupConfig)
×
85
        && Objects.equals(routeLookupChannelServiceConfig, that.routeLookupChannelServiceConfig)
×
86
        && Objects.equals(policy, that.policy);
×
87
  }
88

89
  @Override
90
  public int hashCode() {
91
    return Objects.hash(routeLookupConfig, routeLookupChannelServiceConfig, policy);
×
92
  }
93

94
  @Override
95
  public String toString() {
96
    return MoreObjects.toStringHelper(this)
×
97
        .add("routeLookupConfig", routeLookupConfig)
×
98
        .add("routeLookupChannelServiceConfig", routeLookupChannelServiceConfig)
×
99
        .add("policy", policy)
×
100
        .toString();
×
101
  }
102

103
  /** ChildLoadBalancingPolicy is an elected child policy to delegate requests. */
104
  static final class ChildLoadBalancingPolicy {
105

106
    private final Map<String, Object> effectiveRawChildPolicy;
107
    private final LoadBalancerProvider effectiveLbProvider;
108
    private final String targetFieldName;
109

110
    @VisibleForTesting
111
    ChildLoadBalancingPolicy(
112
        String targetFieldName,
113
        Map<String, Object> effectiveRawChildPolicy,
114
        LoadBalancerProvider effectiveLbProvider) {
1✔
115
      checkArgument(
1✔
116
          targetFieldName != null && !targetFieldName.isEmpty(),
1✔
117
          "targetFieldName cannot be empty or null");
118
      this.targetFieldName = targetFieldName;
1✔
119
      this.effectiveRawChildPolicy =
1✔
120
          checkNotNull(effectiveRawChildPolicy, "effectiveRawChildPolicy");
1✔
121
      this.effectiveLbProvider = checkNotNull(effectiveLbProvider, "effectiveLbProvider");
1✔
122
    }
1✔
123

124
    /** Creates ChildLoadBalancingPolicy. */
125
    @SuppressWarnings("unchecked")
126
    static ChildLoadBalancingPolicy create(
127
        String childPolicyConfigTargetFieldName, List<Map<String, ?>> childPolicies)
128
        throws InvalidChildPolicyConfigException {
129
      Map<String, Object> effectiveChildPolicy = null;
1✔
130
      LoadBalancerProvider effectiveLbProvider = null;
1✔
131
      List<String> policyTried = new ArrayList<>();
1✔
132

133
      LoadBalancerRegistry lbRegistry = LoadBalancerRegistry.getDefaultRegistry();
1✔
134
      for (Map<String, ?> childPolicy : childPolicies) {
1✔
135
        if (childPolicy.isEmpty()) {
1✔
136
          continue;
×
137
        }
138
        if (childPolicy.size() != 1) {
1✔
139
          throw
1✔
140
              new InvalidChildPolicyConfigException(
141
                  "childPolicy should have exactly one loadbalancing policy");
142
        }
143
        String policyName = childPolicy.keySet().iterator().next();
1✔
144
        LoadBalancerProvider provider = lbRegistry.getProvider(policyName);
1✔
145
        if (provider != null) {
1✔
146
          effectiveLbProvider = provider;
1✔
147
          effectiveChildPolicy = Collections.unmodifiableMap(childPolicy);
1✔
148
          break;
1✔
149
        }
150
        policyTried.add(policyName);
×
151
      }
×
152
      if (effectiveChildPolicy == null) {
1✔
153
        throw
1✔
154
            new InvalidChildPolicyConfigException(
155
                String.format("no valid childPolicy found, policy tried: %s", policyTried));
1✔
156
      }
157
      return
1✔
158
          new ChildLoadBalancingPolicy(
159
              childPolicyConfigTargetFieldName,
160
              (Map<String, Object>) effectiveChildPolicy.values().iterator().next(),
1✔
161
              effectiveLbProvider);
162
    }
163

164
    /** Creates a child load balancer config for given target from elected raw child policy. */
165
    Map<String, ?> getEffectiveChildPolicy(String target) {
166
      Map<String, Object> childPolicy = new HashMap<>(effectiveRawChildPolicy);
1✔
167
      childPolicy.put(targetFieldName, target);
1✔
168
      return childPolicy;
1✔
169
    }
170

171
    /** Returns the elected child {@link LoadBalancerProvider}. */
172
    LoadBalancerProvider getEffectiveLbProvider() {
173
      return effectiveLbProvider;
1✔
174
    }
175

176
    @Override
177
    public boolean equals(Object o) {
178
      if (this == o) {
×
179
        return true;
×
180
      }
181
      if (o == null || getClass() != o.getClass()) {
×
182
        return false;
×
183
      }
184
      ChildLoadBalancingPolicy that = (ChildLoadBalancingPolicy) o;
×
185
      return Objects.equals(effectiveRawChildPolicy, that.effectiveRawChildPolicy)
×
186
          && Objects.equals(effectiveLbProvider, that.effectiveLbProvider)
×
187
          && Objects.equals(targetFieldName, that.targetFieldName);
×
188
    }
189

190
    @Override
191
    public int hashCode() {
192
      return Objects.hash(effectiveRawChildPolicy, effectiveLbProvider, targetFieldName);
×
193
    }
194

195
    @Override
196
    public String toString() {
197
      return MoreObjects.toStringHelper(this)
×
198
          .add("effectiveRawChildPolicy", effectiveRawChildPolicy)
×
199
          .add("effectiveLbProvider", effectiveLbProvider)
×
200
          .add("childPolicyConfigTargetFieldName", targetFieldName)
×
201
          .toString();
×
202
    }
203
  }
204

205
  /** Factory for {@link ChildPolicyWrapper}. Not thread-safe. */
206
  static final class RefCountedChildPolicyWrapperFactory {
207
    @VisibleForTesting
1✔
208
    final Map<String /* target */, RefCountedChildPolicyWrapper> childPolicyMap =
209
        new HashMap<>();
210

211
    private final ChildLoadBalancerHelperProvider childLbHelperProvider;
212
    private final ChildLbStatusListener childLbStatusListener;
213
    private final ChildLoadBalancingPolicy childPolicy;
214
    private final ResolvedAddressFactory childLbResolvedAddressFactory;
215

216
    public RefCountedChildPolicyWrapperFactory(
217
        ChildLoadBalancingPolicy childPolicy,
218
        ResolvedAddressFactory childLbResolvedAddressFactory,
219
        ChildLoadBalancerHelperProvider childLbHelperProvider,
220
        ChildLbStatusListener childLbStatusListener) {
1✔
221
      this.childPolicy = checkNotNull(childPolicy, "childPolicy");
1✔
222
      this.childLbResolvedAddressFactory =
1✔
223
          checkNotNull(childLbResolvedAddressFactory, "childLbResolvedAddressFactory");
1✔
224
      this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider");
1✔
225
      this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
1✔
226
    }
1✔
227

228
    ChildPolicyWrapper createOrGet(String target) {
229
      // TODO(creamsoup) check if the target is valid or not
230
      RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
1✔
231
      if (pooledChildPolicyWrapper == null) {
1✔
232
        ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper(
1✔
233
            target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider,
234
            childLbStatusListener);
235
        pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper);
1✔
236
        childPolicyMap.put(target, pooledChildPolicyWrapper);
1✔
237
        return pooledChildPolicyWrapper.getObject();
1✔
238
      } else {
239
        ChildPolicyWrapper childPolicyWrapper = pooledChildPolicyWrapper.getObject();
1✔
240
        if (childPolicyWrapper.getPicker() != null) {
1✔
241
          childPolicyWrapper.refreshState();
1✔
242
        }
243
        return childPolicyWrapper;
1✔
244
      }
245
    }
246

247
    List<ChildPolicyWrapper> createOrGet(List<String> targets) {
248
      List<ChildPolicyWrapper> retVal = new ArrayList<>();
1✔
249
      for (String target : targets) {
1✔
250
        retVal.add(createOrGet(target));
1✔
251
      }
1✔
252
      return retVal;
1✔
253
    }
254

255
    void release(ChildPolicyWrapper childPolicyWrapper) {
256
      checkNotNull(childPolicyWrapper, "childPolicyWrapper");
1✔
257
      String target = childPolicyWrapper.getTarget();
1✔
258
      RefCountedChildPolicyWrapper existing = childPolicyMap.get(target);
1✔
259
      checkState(existing != null, "Cannot access already released object");
1✔
260
      existing.returnObject(childPolicyWrapper);
1✔
261
      if (existing.isReleased()) {
1✔
262
        childPolicyMap.remove(target);
1✔
263
      }
264
    }
1✔
265
  }
266

267
  /**
268
   * ChildPolicyWrapper is a wrapper class for child load balancing policy with associated helper /
269
   * utility classes to manage the child policy.
270
   */
271
  static final class ChildPolicyWrapper {
272

273
    private final String target;
274
    private final ChildPolicyReportingHelper helper;
275
    private final LoadBalancer lb;
276
    private volatile SubchannelPicker picker;
277
    private ConnectivityState state;
278

279
    public ChildPolicyWrapper(
280
        String target,
281
        ChildLoadBalancingPolicy childPolicy,
282
        final ResolvedAddressFactory childLbResolvedAddressFactory,
283
        ChildLoadBalancerHelperProvider childLbHelperProvider,
284
        ChildLbStatusListener childLbStatusListener) {
1✔
285
      this.target = target;
1✔
286
      this.helper =
1✔
287
          new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener);
288
      LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
1✔
289
      final ConfigOrError lbConfig =
1✔
290
          lbProvider
291
              .parseLoadBalancingPolicyConfig(
1✔
292
                  childPolicy.getEffectiveChildPolicy(target));
1✔
293
      this.lb = lbProvider.newLoadBalancer(helper);
1✔
294
      helper.getChannelLogger().log(
1✔
295
          ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", lbConfig.getConfig());
1✔
296
      helper.getSynchronizationContext().execute(
1✔
297
          new Runnable() {
1✔
298
            @Override
299
            public void run() {
300
              if (!lb.acceptResolvedAddresses(
1✔
301
                  childLbResolvedAddressFactory.create(lbConfig.getConfig())).isOk()) {
1✔
302
                helper.refreshNameResolution();
1✔
303
              }
304
              lb.requestConnection();
1✔
305
            }
1✔
306
          });
307
    }
1✔
308

309
    String getTarget() {
310
      return target;
1✔
311
    }
312

313
    SubchannelPicker getPicker() {
314
      return picker;
1✔
315
    }
316

317
    ChildPolicyReportingHelper getHelper() {
318
      return helper;
1✔
319
    }
320

321
    public ConnectivityState getState() {
322
      return state;
1✔
323
    }
324

325
    void refreshState() {
326
      helper.getSynchronizationContext().execute(
1✔
327
          new Runnable() {
1✔
328
            @Override
329
            public void run() {
330
              helper.updateBalancingState(state, picker);
1✔
331
            }
1✔
332
          }
333
      );
334
    }
1✔
335

336
    void shutdown() {
337
      helper.getSynchronizationContext().execute(
1✔
338
          new Runnable() {
1✔
339
            @Override
340
            public void run() {
341
              lb.shutdown();
1✔
342
            }
1✔
343
          }
344
      );
345
    }
1✔
346

347
    @Override
348
    public String toString() {
349
      return MoreObjects.toStringHelper(this)
×
350
          .add("target", target)
×
351
          .add("picker", picker)
×
352
          .add("state", state)
×
353
          .toString();
×
354
    }
355

356
    /**
357
     * A delegating {@link io.grpc.LoadBalancer.Helper} maintains status of {@link
358
     * ChildPolicyWrapper} when {@link Subchannel} status changed. This helper is used between child
359
     * policy and parent load-balancer where each picker in child policy is governed by a governing
360
     * picker (RlsPicker). The governing picker will be reported back to the parent load-balancer.
361
     */
362
    final class ChildPolicyReportingHelper extends ForwardingLoadBalancerHelper {
363

364
      private final ChildLoadBalancerHelper delegate;
365
      private final ChildLbStatusListener listener;
366

367
      ChildPolicyReportingHelper(
368
          ChildLoadBalancerHelperProvider childHelperProvider,
369
          ChildLbStatusListener listener) {
1✔
370
        checkNotNull(childHelperProvider, "childHelperProvider");
1✔
371
        this.delegate = childHelperProvider.forTarget(getTarget());
1✔
372
        this.listener = checkNotNull(listener, "listener");
1✔
373
      }
1✔
374

375
      @Override
376
      protected Helper delegate() {
377
        return delegate;
1✔
378
      }
379

380
      @Override
381
      public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
382
        picker = newPicker;
1✔
383
        state = newState;
1✔
384
        super.updateBalancingState(newState, newPicker);
1✔
385
        listener.onStatusChanged(newState);
1✔
386
      }
1✔
387
    }
388
  }
389

390
  /** Listener for child lb status change events. */
391
  interface ChildLbStatusListener {
392

393
    /** Notifies when child lb status changes. */
394
    void onStatusChanged(ConnectivityState newState);
395
  }
396

397
  private static final class RefCountedChildPolicyWrapper
398
      implements ObjectPool<ChildPolicyWrapper> {
399

400
    private long refCnt;
401
    @Nullable
402
    private ChildPolicyWrapper childPolicyWrapper;
403

404
    private RefCountedChildPolicyWrapper(ChildPolicyWrapper childPolicyWrapper) {
1✔
405
      this.childPolicyWrapper = checkNotNull(childPolicyWrapper, "childPolicyWrapper");
1✔
406
    }
1✔
407

408
    @Override
409
    public ChildPolicyWrapper getObject() {
410
      checkState(!isReleased(), "ChildPolicyWrapper is already released");
1✔
411
      refCnt++;
1✔
412
      return childPolicyWrapper;
1✔
413
    }
414

415
    @Override
416
    @Nullable
417
    public ChildPolicyWrapper returnObject(Object object) {
418
      checkState(
1✔
419
          !isReleased(),
1✔
420
          "cannot return already released ChildPolicyWrapper, this is possibly a bug.");
421
      checkState(
1✔
422
          childPolicyWrapper == object,
423
          "returned object doesn't match the pooled childPolicyWrapper");
424
      long newCnt = --refCnt;
1✔
425
      checkState(newCnt != -1, "Cannot return never pooled childPolicyWrapper");
1✔
426
      if (newCnt == 0) {
1✔
427
        childPolicyWrapper.shutdown();
1✔
428
        childPolicyWrapper = null;
1✔
429
      }
430
      return null;
1✔
431
    }
432

433
    boolean isReleased() {
434
      return childPolicyWrapper == null;
1✔
435
    }
436

437
    static RefCountedChildPolicyWrapper of(ChildPolicyWrapper childPolicyWrapper) {
438
      return new RefCountedChildPolicyWrapper(childPolicyWrapper);
1✔
439
    }
440

441
    @Override
442
    public String toString() {
443
      return MoreObjects.toStringHelper(this)
×
444
          .add("object", childPolicyWrapper)
×
445
          .add("refCnt", refCnt)
×
446
          .toString();
×
447
    }
448
  }
449

450
  /** Exception thrown when attempting to parse child policy encountered parsing issue. */
451
  static final class InvalidChildPolicyConfigException extends Exception {
452

453
    private static final long serialVersionUID = 0L;
454

455
    InvalidChildPolicyConfigException(String message) {
456
      super(message);
1✔
457
    }
1✔
458

459
    @Override
460
    public synchronized Throwable fillInStackTrace() {
461
      // no stack trace above this point
462
      return this;
1✔
463
    }
464
  }
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc