• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19681

07 Feb 2025 08:46AM CUT coverage: 88.609% (-0.008%) from 88.617%
#19681

push

github

web-flow
protobuf: Stabilize marshallerWithRecursionLimit (#11884)

33776 of 38118 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.34
/../protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.protobuf.lite;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.protobuf.CodedInputStream;
23
import com.google.protobuf.ExtensionRegistryLite;
24
import com.google.protobuf.InvalidProtocolBufferException;
25
import com.google.protobuf.MessageLite;
26
import com.google.protobuf.Parser;
27
import io.grpc.ExperimentalApi;
28
import io.grpc.KnownLength;
29
import io.grpc.Metadata;
30
import io.grpc.MethodDescriptor.Marshaller;
31
import io.grpc.MethodDescriptor.PrototypeMarshaller;
32
import io.grpc.Status;
33
import java.io.IOException;
34
import java.io.InputStream;
35
import java.io.OutputStream;
36
import java.lang.ref.Reference;
37
import java.lang.ref.WeakReference;
38

39
/**
40
 * Utility methods for using protobuf with grpc.
41
 *
42
 * <p>Note that this class will remain experimental for the foreseeable future as the proto lite
43
 * API, which this class depends on, is not guaranteed to be stable. This is explained in protobuf
44
 * documentation at: https://github.com/protocolbuffers/protobuf/blob/main/java/lite.md
45
 */
46
@ExperimentalApi("Will remain experimental as protobuf lite API is not stable")
47
public final class ProtoLiteUtils {
48

49
  // default visibility to avoid synthetic accessors
50
  static volatile ExtensionRegistryLite globalRegistry =
1✔
51
      ExtensionRegistryLite.getEmptyRegistry();
1✔
52

53
  private static final int BUF_SIZE = 8192;
54

55
  /**
56
   * The same value as {@link io.grpc.internal.GrpcUtil#DEFAULT_MAX_MESSAGE_SIZE}.
57
   */
58
  @VisibleForTesting
59
  static final int DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024;
60

61
  /**
62
   * Sets the global registry for proto marshalling shared across all servers and clients.
63
   *
64
   * <p>Warning:  This API will likely change over time.  It is not possible to have separate
65
   * registries per Process, Server, Channel, Service, or Method.  This is intentional until there
66
   * is a more appropriate API to set them.
67
   *
68
   * <p>Warning:  Do NOT modify the extension registry after setting it.  It is thread safe to call
69
   * {@link #setExtensionRegistry}, but not to modify the underlying object.
70
   *
71
   * <p>If you need custom parsing behavior for protos, you will need to make your own
72
   * {@code MethodDescriptor.Marshaller} for the time being.
73
   *
74
   * @since 1.0.0
75
   */
76
  public static void setExtensionRegistry(ExtensionRegistryLite newRegistry) {
77
    globalRegistry = checkNotNull(newRegistry, "newRegistry");
×
78
  }
×
79

80
  /**
81
   * Creates a {@link Marshaller} for protos of the same type as {@code defaultInstance}.
82
   *
83
   * @since 1.0.0
84
   */
85
  public static <T extends MessageLite> Marshaller<T> marshaller(T defaultInstance) {
86
    // TODO(ejona): consider changing return type to PrototypeMarshaller (assuming ABI safe)
87
    return new MessageMarshaller<>(defaultInstance, -1);
1✔
88
  }
89

90
  /**
91
   * Creates a {@link Marshaller} for protos of the same type as {@code defaultInstance} and a
92
   * custom limit for the recursion depth. Any negative number will leave the limit as its default
93
   * value as defined by the protobuf library.
94
   *
95
   * @since 1.56.0
96
   */
97
  public static <T extends MessageLite> Marshaller<T> marshallerWithRecursionLimit(
98
      T defaultInstance, int recursionLimit) {
99
    return new MessageMarshaller<>(defaultInstance, recursionLimit);
1✔
100
  }
101

102
  /**
103
   * Produce a metadata marshaller for a protobuf type.
104
   *
105
   * @since 1.0.0
106
   */
107
  public static <T extends MessageLite> Metadata.BinaryMarshaller<T> metadataMarshaller(
108
      T defaultInstance) {
109
    return new MetadataMarshaller<>(defaultInstance);
1✔
110
  }
111

112
  /** Copies the data from input stream to output stream. */
113
  static long copy(InputStream from, OutputStream to) throws IOException {
114
    // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
115
    checkNotNull(from, "inputStream cannot be null!");
1✔
116
    checkNotNull(to, "outputStream cannot be null!");
1✔
117
    byte[] buf = new byte[BUF_SIZE];
1✔
118
    long total = 0;
1✔
119
    while (true) {
120
      int r = from.read(buf);
1✔
121
      if (r == -1) {
1✔
122
        break;
1✔
123
      }
124
      to.write(buf, 0, r);
1✔
125
      total += r;
1✔
126
    }
1✔
127
    return total;
1✔
128
  }
129

130
  private ProtoLiteUtils() {
131
  }
132

133
  private static final class MessageMarshaller<T extends MessageLite>
134
      implements PrototypeMarshaller<T> {
135

136
    private static final ThreadLocal<Reference<byte[]>> bufs = new ThreadLocal<>();
1✔
137

138
    private final Parser<T> parser;
139
    private final T defaultInstance;
140
    private final int recursionLimit;
141

142
    @SuppressWarnings("unchecked")
143
    MessageMarshaller(T defaultInstance, int recursionLimit) {
1✔
144
      this.defaultInstance = checkNotNull(defaultInstance, "defaultInstance cannot be null");
1✔
145
      this.parser = (Parser<T>) defaultInstance.getParserForType();
1✔
146
      this.recursionLimit = recursionLimit;
1✔
147
    }
1✔
148

149
    @SuppressWarnings("unchecked")
150
    @Override
151
    public Class<T> getMessageClass() {
152
      // Precisely T since protobuf doesn't let messages extend other messages.
153
      return (Class<T>) defaultInstance.getClass();
1✔
154
    }
155

156
    @Override
157
    public T getMessagePrototype() {
158
      return defaultInstance;
1✔
159
    }
160

161
    @Override
162
    public InputStream stream(T value) {
163
      return new ProtoInputStream(value, parser);
1✔
164
    }
165

166
    @Override
167
    public T parse(InputStream stream) {
168
      if (stream instanceof ProtoInputStream) {
1✔
169
        ProtoInputStream protoStream = (ProtoInputStream) stream;
1✔
170
        // Optimization for in-memory transport. Returning provided object is safe since protobufs
171
        // are immutable.
172
        //
173
        // However, we can't assume the types match, so we have to verify the parser matches.
174
        // Today the parser is always the same for a given proto, but that isn't guaranteed. Even
175
        // if not, using the same MethodDescriptor would ensure the parser matches and permit us
176
        // to enable this optimization.
177
        if (protoStream.parser() == parser) {
1✔
178
          try {
179
            @SuppressWarnings("unchecked")
180
            T message = (T) ((ProtoInputStream) stream).message();
1✔
181
            return message;
1✔
182
          } catch (IllegalStateException ignored) {
1✔
183
            // Stream must have been read from, which is a strange state. Since the point of this
184
            // optimization is to be transparent, instead of throwing an error we'll continue,
185
            // even though it seems likely there's a bug.
186
          }
187
        }
188
      }
189
      CodedInputStream cis = null;
1✔
190
      try {
191
        if (stream instanceof KnownLength) {
1✔
192
          int size = stream.available();
1✔
193
          if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
1✔
194
            Reference<byte[]> ref;
195
            // buf should not be used after this method has returned.
196
            byte[] buf;
197
            if ((ref = bufs.get()) == null || (buf = ref.get()) == null || buf.length < size) {
1✔
198
              buf = new byte[size];
1✔
199
              bufs.set(new WeakReference<>(buf));
1✔
200
            }
201

202
            int remaining = size;
1✔
203
            while (remaining > 0) {
1✔
204
              int position = size - remaining;
1✔
205
              int count = stream.read(buf, position, remaining);
1✔
206
              if (count == -1) {
1✔
207
                break;
×
208
              }
209
              remaining -= count;
1✔
210
            }
1✔
211

212
            if (remaining != 0) {
1✔
213
              int position = size - remaining;
×
214
              throw new RuntimeException("size inaccurate: " + size + " != " + position);
×
215
            }
216
            cis = CodedInputStream.newInstance(buf, 0, size);
1✔
217
          } else if (size == 0) {
1✔
218
            return defaultInstance;
1✔
219
          }
220
        }
221
      } catch (IOException e) {
×
222
        throw new RuntimeException(e);
×
223
      }
1✔
224
      if (cis == null) {
1✔
225
        cis = CodedInputStream.newInstance(stream);
1✔
226
      }
227
      // Pre-create the CodedInputStream so that we can remove the size limit restriction
228
      // when parsing.
229
      cis.setSizeLimit(Integer.MAX_VALUE);
1✔
230

231
      if (recursionLimit >= 0) {
1✔
232
        cis.setRecursionLimit(recursionLimit);
1✔
233
      }
234

235
      try {
236
        return parseFrom(cis);
1✔
237
      } catch (InvalidProtocolBufferException ipbe) {
1✔
238
        throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
1✔
239
            .withCause(ipbe).asRuntimeException();
1✔
240
      }
241
    }
242

243
    private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
244
      T message = parser.parseFrom(stream, globalRegistry);
1✔
245
      try {
246
        stream.checkLastTagWas(0);
1✔
247
        return message;
1✔
248
      } catch (InvalidProtocolBufferException e) {
×
249
        e.setUnfinishedMessage(message);
×
250
        throw e;
×
251
      }
252
    }
253
  }
254

255
  private static final class MetadataMarshaller<T extends MessageLite>
256
      implements Metadata.BinaryMarshaller<T> {
257

258
    private final T defaultInstance;
259

260
    MetadataMarshaller(T defaultInstance) {
1✔
261
      this.defaultInstance = defaultInstance;
1✔
262
    }
1✔
263

264
    @Override
265
    public byte[] toBytes(T value) {
266
      return value.toByteArray();
1✔
267
    }
268

269
    @Override
270
    @SuppressWarnings("unchecked")
271
    public T parseBytes(byte[] serialized) {
272
      try {
273
        return (T) defaultInstance.getParserForType().parseFrom(serialized, globalRegistry);
1✔
274
      } catch (InvalidProtocolBufferException ipbe) {
1✔
275
        throw new IllegalArgumentException(ipbe);
1✔
276
      }
277
    }
278
  }
279
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc