Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to respect the marshaller specified in gRPC MethodDescriptor #5630

Merged
merged 16 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
b808a8b
feat: add option to respect the marshaller specified in gRPC MethodDe…
jaeseung-bae Apr 22, 2024
24fe248
Update grpc/src/main/java/com/linecorp/armeria/client/grpc/GrpcClient…
jaeseung-bae Apr 23, 2024
e4fa595
Update grpc/src/main/java/com/linecorp/armeria/client/grpc/GrpcClient…
jaeseung-bae Apr 23, 2024
e11b59d
test: add test for GrpcClientBuilder to set use-method-marshaller cor…
jaeseung-bae Apr 24, 2024
95d2ed1
test: update test for GrpcClientBuilder to set use-method-marshaller …
jaeseung-bae Apr 25, 2024
e361a5d
Merge branch 'main' into feat/use-method-marshaller
ikhoon Apr 25, 2024
e848c98
Update grpc/src/test/java/com/linecorp/armeria/server/grpc/UnaryServe…
jaeseung-bae Apr 25, 2024
76ba74c
test: update test for Client and Server to set use-method-marshaller …
jaeseung-bae Apr 26, 2024
e074d06
Merge branch 'main' into feat/use-method-marshaller
jaeseung-bae Apr 26, 2024
6bd0be1
chore: move test helper class to the bottom code to focus on testcase…
jaeseung-bae Apr 28, 2024
06c5268
chore: make call count variable as local and use different call count…
jaeseung-bae Apr 28, 2024
5543dc1
chore: add validate logic for 'unsafeWrapRequestBuffers' and 'useMeth…
jaeseung-bae Apr 29, 2024
7d25d00
chore: deserializeProto now checks useMethodMarshaller
jaeseung-bae Apr 30, 2024
73352e7
chore: update test to check parse call count
jaeseung-bae May 2, 2024
00ea71b
chore: apply feedback
jaeseung-bae May 9, 2024
33b67a4
chore: remove redundant call when useMethodMarshaller=true
jaeseung-bae May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.linecorp.armeria.client.grpc.GrpcClientOptions.MAX_INBOUND_MESSAGE_SIZE_BYTES;
import static com.linecorp.armeria.client.grpc.GrpcClientOptions.MAX_OUTBOUND_MESSAGE_SIZE_BYTES;
import static com.linecorp.armeria.client.grpc.GrpcClientOptions.UNSAFE_WRAP_RESPONSE_BUFFERS;
import static com.linecorp.armeria.client.grpc.GrpcClientOptions.USE_METHOD_MARSHALLER;
import static java.util.Objects.requireNonNull;

import java.net.URI;
Expand Down Expand Up @@ -81,6 +82,7 @@
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.DecompressorRegistry;
import io.grpc.MethodDescriptor;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;

Expand Down Expand Up @@ -287,6 +289,16 @@ public GrpcClientBuilder enableUnsafeWrapResponseBuffers(boolean enableUnsafeWra
return option(UNSAFE_WRAP_RESPONSE_BUFFERS.newValue(enableUnsafeWrapResponseBuffers));
}

/**
* Sets whether to respect the marshaller specified in gRPC {@link MethodDescriptor}.
* If disabled, the default marshaller will be used, which is more efficient.
* This property is disabled by default.
*/
@UnstableApi
public GrpcClientBuilder useMethodMarshaller(boolean useMethodMarshaller) {
return option(USE_METHOD_MARSHALLER.newValue(useMethodMarshaller));
}
ikhoon marked this conversation as resolved.
Show resolved Hide resolved

/**
* Sets the factory that creates a {@link GrpcJsonMarshaller} that serializes and deserializes request or
* response messages to and from JSON depending on the {@link SerializationFormat}. The returned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.DecompressorRegistry;
import io.grpc.MethodDescriptor;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;

Expand Down Expand Up @@ -176,5 +177,13 @@ public final class GrpcClientOptions {
ClientOption.define("EXCEPTION_HANDLER",
(ctx, cause, metadata) -> GrpcStatus.fromThrowable(cause));

/**
* Sets whether to respect the marshaller specified in gRPC {@link MethodDescriptor}.
* If disabled, the default marshaller will be used, which is more efficient.
* This option is disabled by default.
*/
public static final ClientOption<Boolean> USE_METHOD_MARSHALLER =
ClientOption.define("USE_METHOD_MARSHALLER", false);

private GrpcClientOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,24 @@ final class ArmeriaChannel extends Channel implements ClientBuilderParams, Unwra
private final DecompressorRegistry decompressorRegistry;
private final CallCredentials credentials0;
private final GrpcExceptionHandlerFunction exceptionHandler;
private final boolean useMethodMarshaller;

ArmeriaChannel(ClientBuilderParams params,
HttpClient httpClient,
MeterRegistry meterRegistry,
SessionProtocol sessionProtocol,
SerializationFormat serializationFormat,
@Nullable GrpcJsonMarshaller jsonMarshaller,
Map<MethodDescriptor<?, ?>, String> simpleMethodNames) {
Map<MethodDescriptor<?, ?>, String> simpleMethodNames,
boolean useMethodMarshaller) {
this.params = params;
this.httpClient = httpClient;
this.meterRegistry = meterRegistry;
this.sessionProtocol = sessionProtocol;
this.serializationFormat = serializationFormat;
this.jsonMarshaller = jsonMarshaller;
this.simpleMethodNames = simpleMethodNames;
this.useMethodMarshaller = useMethodMarshaller;

final ClientOptions options = options();
maxOutboundMessageSizeBytes = options.get(GrpcClientOptions.MAX_OUTBOUND_MESSAGE_SIZE_BYTES);
Expand Down Expand Up @@ -180,7 +183,8 @@ public <I, O> ClientCall<I, O> newCall(MethodDescriptor<I, O> method, CallOption
serializationFormat,
jsonMarshaller,
unsafeWrapResponseBuffers,
exceptionHandler);
exceptionHandler,
useMethodMarshaller);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ final class ArmeriaClientCall<I, O> extends ClientCall<I, O>
SerializationFormat serializationFormat,
@Nullable GrpcJsonMarshaller jsonMarshaller,
boolean unsafeWrapResponseBuffers,
GrpcExceptionHandlerFunction exceptionHandler) {
GrpcExceptionHandlerFunction exceptionHandler,
boolean useMethodMarshaller) {
this.ctx = ctx;
this.endpointGroup = endpointGroup;
this.httpClient = httpClient;
Expand All @@ -184,7 +185,7 @@ final class ArmeriaClientCall<I, O> extends ClientCall<I, O>

requestFramer = new ArmeriaMessageFramer(ctx.alloc(), maxOutboundMessageSizeBytes, grpcWebText);
marshaller = new GrpcMessageMarshaller<>(ctx.alloc(), serializationFormat, method, jsonMarshaller,
unsafeWrapResponseBuffers);
unsafeWrapResponseBuffers, useMethodMarshaller);

if (callOptions.getExecutor() == null) {
executor = MoreExecutors.directExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,11 @@ public Object newClient(ClientBuilderParams params) {
} else {
jsonMarshaller = null;
}
final boolean useMethodMarshaller = options.get(GrpcClientOptions.USE_METHOD_MARSHALLER);
minwoox marked this conversation as resolved.
Show resolved Hide resolved

final ArmeriaChannel armeriaChannel =
new ArmeriaChannel(newParams, httpClient, meterRegistry(), scheme.sessionProtocol(),
serializationFormat, jsonMarshaller, simpleMethodNames);
serializationFormat, jsonMarshaller, simpleMethodNames, useMethodMarshaller);
final Iterable<? extends ClientInterceptor> interceptors = options.get(GrpcClientOptions.INTERCEPTORS);
final Channel channel;
if (!Iterables.isEmpty(interceptors)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ private enum MessageType {
private final MessageType responseType;
private final boolean unsafeWrapDeserializedBuffer;
private final boolean isProto;
private final boolean useMethodMarshaller;

public GrpcMessageMarshaller(ByteBufAllocator alloc,
SerializationFormat serializationFormat,
MethodDescriptor<I, O> method,
@Nullable GrpcJsonMarshaller jsonMarshaller,
boolean unsafeWrapDeserializedBuffer) {
boolean unsafeWrapDeserializedBuffer,
boolean useMethodMarshaller) {
this.alloc = requireNonNull(alloc, "alloc");
this.method = requireNonNull(method, "method");
this.unsafeWrapDeserializedBuffer = unsafeWrapDeserializedBuffer;
Expand All @@ -84,6 +86,7 @@ public GrpcMessageMarshaller(ByteBufAllocator alloc,
responseMarshaller = method.getResponseMarshaller();
requestType = marshallerType(requestMarshaller);
responseType = marshallerType(responseMarshaller);
this.useMethodMarshaller = useMethodMarshaller;
}

public ByteBuf serializeRequest(I message) throws IOException {
Expand Down Expand Up @@ -203,8 +206,17 @@ private <T> ByteBuf serializeProto(PrototypeMarshaller<T> marshaller, Message me
final ByteBuf buf = alloc.buffer(serializedSize);
boolean success = false;
try {
message.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(0, serializedSize)));
buf.writerIndex(serializedSize);
if (useMethodMarshaller) {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
final InputStream is = marshaller.stream((T) message);
try (ByteBufOutputStream os = new ByteBufOutputStream(buf)) {
ByteStreams.copy(is, os);
} finally {
is.close();
}
} else {
message.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(0, serializedSize)));
buf.writerIndex(serializedSize);
}
success = true;
} finally {
if (!success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ protected AbstractServerCall(HttpRequest req,
ResponseHeaders defaultHeaders,
@Nullable GrpcExceptionHandlerFunction exceptionHandler,
@Nullable Executor blockingExecutor,
boolean autoCompression) {
boolean autoCompression,
boolean useMethodMarshaller) {
requireNonNull(req, "req");
this.method = requireNonNull(method, "method");
this.simpleMethodName = requireNonNull(simpleMethodName, "simpleMethodName");
Expand All @@ -170,7 +171,7 @@ protected AbstractServerCall(HttpRequest req,
clientAcceptEncoding = req.headers().get(GrpcHeaderNames.GRPC_ACCEPT_ENCODING, "");
this.autoCompression = autoCompression;
marshaller = new GrpcMessageMarshaller<>(alloc, serializationFormat, method, jsonMarshaller,
unsafeWrapRequestBuffers);
unsafeWrapRequestBuffers, useMethodMarshaller);
this.unsafeWrapRequestBuffers = unsafeWrapRequestBuffers;
this.blockingExecutor = blockingExecutor;
defaultResponseHeaders = defaultHeaders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private static Map<String, GrpcJsonMarshaller> getJsonMarshallers(
private final boolean useBlockingTaskExecutor;
private final boolean unsafeWrapRequestBuffers;
private final boolean useClientTimeoutHeader;
private final boolean useMethodMarshaller;
private final String advertisedEncodingsHeader;
private final Map<SerializationFormat, ResponseHeaders> defaultHeaders;
@Nullable
Expand All @@ -154,7 +155,7 @@ private static Map<String, GrpcJsonMarshaller> getJsonMarshallers(
boolean useClientTimeoutHeader,
boolean lookupMethodFromAttribute,
@Nullable GrpcHealthCheckService grpcHealthCheckService,
boolean autoCompression) {
boolean autoCompression, boolean useMethodMarshaller) {
this.registry = requireNonNull(registry, "registry");
routes = ImmutableSet.copyOf(registry.methodsByRoute().keySet());
exchangeTypes = registry.methods().entrySet().stream()
Expand All @@ -173,6 +174,7 @@ private static Map<String, GrpcJsonMarshaller> getJsonMarshallers(
this.unsafeWrapRequestBuffers = unsafeWrapRequestBuffers;
this.lookupMethodFromAttribute = lookupMethodFromAttribute;
this.autoCompression = autoCompression;
this.useMethodMarshaller = useMethodMarshaller;

advertisedEncodingsHeader = String.join(",", decompressorRegistry.getAdvertisedMessageEncodings());

Expand Down Expand Up @@ -356,7 +358,8 @@ private <I, O> AbstractServerCall<I, O> newServerCall(
defaultHeaders.get(serializationFormat),
exceptionHandler,
blockingExecutor,
autoCompression);
autoCompression,
useMethodMarshaller);
} else {
return new StreamingServerCall<>(
req,
Expand All @@ -374,7 +377,8 @@ private <I, O> AbstractServerCall<I, O> newServerCall(
defaultHeaders.get(serializationFormat),
exceptionHandler,
blockingExecutor,
autoCompression);
autoCompression,
useMethodMarshaller);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public final class GrpcServiceBuilder {

private boolean useClientTimeoutHeader = true;

private boolean useMethodMarshaller;

private boolean enableHealthCheckService;

private boolean autoCompression;
Expand Down Expand Up @@ -824,6 +826,16 @@ public GrpcServiceBuilder autoCompression(boolean autoCompression) {
return this;
}

/**
* Sets whether to respect the marshaller specified in gRPC {@link MethodDescriptor}
* If not set, will use the default(false), which use more efficient way that reduce copy operation.
*/
@UnstableApi
public GrpcServiceBuilder useMethodMarshaller(boolean useMethodMarshaller) {
this.useMethodMarshaller = useMethodMarshaller;
return this;
}

/**
* Sets the specified {@link GrpcExceptionHandlerFunction} that maps a {@link Throwable}
* to a gRPC {@link Status}.
Expand Down Expand Up @@ -1016,7 +1028,8 @@ public GrpcService build() {
useClientTimeoutHeader,
enableHttpJsonTranscoding, // The method definition might be set when transcoding is enabled.
grpcHealthCheckService,
autoCompression);
autoCompression,
useMethodMarshaller);
if (enableUnframedRequests) {
grpcService = new UnframedGrpcService(
grpcService, handlerRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ final class StreamingServerCall<I, O> extends AbstractServerCall<I, O>
@Nullable GrpcJsonMarshaller jsonMarshaller, boolean unsafeWrapRequestBuffers,
ResponseHeaders defaultHeaders,
@Nullable GrpcExceptionHandlerFunction exceptionHandler,
@Nullable Executor blockingExecutor, boolean autoCompress) {
@Nullable Executor blockingExecutor, boolean autoCompress,
boolean useMethodMarshaller) {
super(req, method, simpleMethodName, compressorRegistry, decompressorRegistry, res,
maxResponseMessageLength, ctx, serializationFormat, jsonMarshaller, unsafeWrapRequestBuffers,
defaultHeaders, exceptionHandler, blockingExecutor, autoCompress);
defaultHeaders, exceptionHandler, blockingExecutor, autoCompress, useMethodMarshaller);
requireNonNull(req, "req");
this.method = requireNonNull(method, "method");
this.ctx = requireNonNull(ctx, "ctx");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ final class UnaryServerCall<I, O> extends AbstractServerCall<I, O> {
ResponseHeaders defaultHeaders,
@Nullable GrpcExceptionHandlerFunction exceptionHandler,
@Nullable Executor blockingExecutor,
boolean autoCompress) {
boolean autoCompress,
boolean useMethodMarshaller) {
super(req, method, simpleMethodName, compressorRegistry, decompressorRegistry, res,
maxResponseMessageLength, ctx, serializationFormat, jsonMarshaller, unsafeWrapRequestBuffers,
defaultHeaders, exceptionHandler, blockingExecutor, autoCompress);
defaultHeaders, exceptionHandler, blockingExecutor, autoCompress, useMethodMarshaller);
requireNonNull(req, "req");
this.ctx = requireNonNull(ctx, "ctx");
final boolean grpcWebText = GrpcSerializationFormats.isGrpcWebText(serializationFormat);
Expand Down