Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental gRPC server #16534

Draft
wants to merge 56 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
fd39955
grpc gradle dependencies
finnegancarroll Oct 24, 2024
fe4f62d
Draft for grpc transport from network plugin impl
finnegancarroll Oct 28, 2024
080de02
grpc needs defs from com.google.errorprone:error_prone_annotations
finnegancarroll Oct 28, 2024
a894746
guava to runtime dep
finnegancarroll Oct 29, 2024
3c05d8d
Remove redundant overrides
finnegancarroll Oct 29, 2024
82bead3
Start grpc server
finnegancarroll Oct 29, 2024
1dc81fc
Add gprc settings
finnegancarroll Oct 29, 2024
331a1e9
Channel related params not needed for grpc
finnegancarroll Oct 29, 2024
08967e7
Worker count from settings
finnegancarroll Oct 29, 2024
99f0184
For now use first port/bind host listed
finnegancarroll Oct 29, 2024
ead594b
Default grpc address
finnegancarroll Oct 30, 2024
648ab97
Initial bind logic for netty grpc
finnegancarroll Oct 30, 2024
d007b72
Use GrpcTransportSettings - fix port in use bug
finnegancarroll Oct 30, 2024
e7b16ea
Add perms for gRPC ports
finnegancarroll Oct 30, 2024
ad0f8f9
Add default health check service, perfmark - update gradle deps
finnegancarroll Oct 30, 2024
77cd64f
Add grpc info
finnegancarroll Oct 31, 2024
cf219c4
Move dep management to libs.versions.toml
finnegancarroll Oct 31, 2024
44272d0
Refactor resolvePublishPort to NetworkService
finnegancarroll Oct 31, 2024
6134c17
Feature flag GRPC_ENABLE_SETTING
finnegancarroll Oct 31, 2024
f9c5382
Add interceptor for grpc server stats
finnegancarroll Oct 31, 2024
4272dee
grpc stats interceptor to final
finnegancarroll Oct 31, 2024
b5fddb1
Add grpc dep sha1s
finnegancarroll Oct 31, 2024
9850513
Hide grpc bind perms behind feature flag
finnegancarroll Oct 31, 2024
016b158
Apache-2.0 lic boilerplate
finnegancarroll Oct 31, 2024
dfca35d
Spotless apply
finnegancarroll Oct 31, 2024
24a40ad
No dispatcher needed for grpc
finnegancarroll Oct 31, 2024
a8547dd
No dispatcher -> no thread prefix
finnegancarroll Oct 31, 2024
ca6b2d3
QOL commit - Enable gRPC feature flag. Unsigned.
finnegancarroll Nov 7, 2024
3bcd5bb
Add GrpcStats to NodeStats. Fix gRPC server stats interceptor prematu…
finnegancarroll Nov 7, 2024
b2a1078
Add GrpcInfo to node info api
finnegancarroll Nov 7, 2024
8d85806
Initial proto definitions for NodesStats/NodesInfo - Taken directly f…
finnegancarroll Nov 7, 2024
4b0744b
Exclude protos from license check
finnegancarroll Nov 7, 2024
8e684aa
Organize packages for generateProto task
finnegancarroll Nov 7, 2024
f88d5ba
Proto package housekeeping
finnegancarroll Nov 8, 2024
81503ce
Node info service impl function stubs.
finnegancarroll Nov 8, 2024
ee85066
Comment unimplemented func stubs
finnegancarroll Nov 8, 2024
d15118a
Fix RegisterTransportActionsRequest proto import
finnegancarroll Nov 11, 2024
00fa7ee
Nodes info service impl
finnegancarroll Nov 11, 2024
e2a0156
Bump versions for gRPC service stubs
finnegancarroll Nov 12, 2024
120c209
Remove guava from forbidden deps
finnegancarroll Nov 14, 2024
5038cf3
Add guava to compile classpath for proto generated code. Force standa…
finnegancarroll Nov 14, 2024
1e5d354
Remove guava specific config
finnegancarroll Nov 14, 2024
3b034cb
Inject service definition into netty module grpc server.
finnegancarroll Nov 14, 2024
00d4eb1
Add nodesInfo service dummy/unimpl response for testing.
finnegancarroll Nov 14, 2024
6151622
Rename gRPCServiceRegistry to GrpcServiceRegistry
andrross Nov 14, 2024
97efd81
Fill in functions tubs for NodesInfoServiceImpl - Note: Writing compl…
finnegancarroll Nov 15, 2024
26e8e1a
Pass cluster client to gRPC service impl
finnegancarroll Nov 15, 2024
b8d88d2
Remove streaming endpoint for NodesInfoService
finnegancarroll Nov 15, 2024
9804e80
Drop api spec generated proto - Source:https://github.com/amberzsy/op…
finnegancarroll Nov 15, 2024
b404cee
Comment source_includes parsing error
finnegancarroll Nov 15, 2024
d741356
Refactor proto folders to seperate spec generated and hand built. Fix…
finnegancarroll Nov 15, 2024
41e70eb
Migrate NodeInfo service to accommodate refactor.
finnegancarroll Nov 15, 2024
edddee9
Unused import
finnegancarroll Nov 15, 2024
56c8940
Service protos for all Request/Response pairs.
finnegancarroll Nov 15, 2024
06d1aa2
Add all DocumentService/SearchService function stubs.
finnegancarroll Nov 15, 2024
e985043
Pre-emptively add Search+Document services to GrpcServiceRegistry.
finnegancarroll Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ class LicenseHeadersTask extends AntTask {
// Generated resources
substringMatcher(licenseFamilyCategory: "GEN ",
licenseFamilyName: "Generated") {
// parsers generated by antlr
pattern(substring: "ANTLR GENERATED CODE")
// parsers generated by antlr
pattern(substring: "ANTLR GENERATED CODE")
// Protobuf
pattern(substring: "Generated by the protocol buffer compiler")
}

// Vendored Code
Expand Down
3 changes: 2 additions & 1 deletion gradle/forbidden-dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
// we do not want any of these dependencies on the compilation classpath
// because they could then be used within OpenSearch
List<String> FORBIDDEN_DEPENDENCIES = [
'guava'
// TODO: Why is guava forbidden?
// 'guava'
]

Closure checkDeps = { Configuration configuration ->
Expand Down
13 changes: 10 additions & 3 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ jettison = "1.5.4"
woodstox = "6.4.0"
kotlin = "1.7.10"
antlr4 = "4.13.1"
guava = "32.1.1-jre"
protobuf = "3.25.5"
jakarta_annotation = "1.3.5"
google_http_client = "1.44.1"
tdigest = "3.3"
hdrhistogram = "2.2.2"
grpc = "1.68.0"

# gRPC
grpc = "1.68.1"
protobuf = "3.25.5"
guava = "32.1.1-jre"
jsr305 = "3.0.2"
failureaccess = "1.0.1"
error_prone_annotations = "2.24.1"
javax_annotations = "1.3.2"
perfmark_api = "0.26.0"

# when updating the JNA version, also update the version in buildSrc/build.gradle
jna = "5.13.0"
Expand Down
16 changes: 16 additions & 0 deletions modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ dependencies {
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"

// gRPC
compileOnly "javax.annotation:javax.annotation-api:${versions.javax_annotations}"
compileOnly "com.google.code.findbugs:jsr305:${versions.jsr305}"
runtimeOnly "com.google.guava:guava:${versions.guava}"
api "com.google.guava:failureaccess:${versions.failureaccess}"
api "com.google.errorprone:error_prone_annotations:${versions.error_prone_annotations}"
api "io.perfmark:perfmark-api:${versions.perfmark_api}"
api "io.grpc:grpc-all:${versions.grpc}"
api "io.grpc:grpc-netty:${versions.grpc}"
api "io.grpc:grpc-api:${versions.grpc}"
api "io.grpc:grpc-protobuf-lite:${versions.grpc}"
api "io.grpc:grpc-protobuf:${versions.grpc}"
api "io.grpc:grpc-stub:${versions.grpc}"
api "io.grpc:grpc-core:${versions.grpc}"
api "io.grpc:grpc-services:${versions.grpc}"
}

restResources {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
32b299e45105aa9b0df8279c74dc1edfcf313ff0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
c4a06a64e650562f30b7bf9aaec1bfed43aca12b
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a150a5dac0c120f8da34d59c7730d4a5fe34ec8c
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9a9f25c58d8d5b0fcf37ae889a50fec87e34ac08
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e5630dfd653d7cad78caf7166e36973f55822e6c
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0ac762f09db8e74f9b17fff5f7f1bcf9a13c5620
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
b8c1772b35292a853f0707a3512090a8edad1fed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fe6a5349fd76e811c19f16e2b3e9453ee339df4b
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
b3aa84b0e7cbe4135d27644dd25e3f1cd4eeed85
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ebd89ad550b74724b1bbe8a04203921ebac5e6d4
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ad575652d84153075dd41ec6177ccb15251262b2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ef65452adaf20bf7d12ef55913aba24037b82738
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.grpc.netty4;

import io.grpc.BindableService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.PortsRange;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.grpc.AbstractGrpcServerTransport;
import org.opensearch.grpc.GrpcStats;
import org.opensearch.grpc.services.GrpcServiceRegistry;
import org.opensearch.transport.NettyAllocator;
import org.opensearch.transport.SharedGroupFactory;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.protobuf.services.ProtoReflectionService;

public class Netty4GrpcServerTransport extends AbstractGrpcServerTransport {
private static final Logger logger = LogManager.getLogger(Netty4GrpcServerTransport.class);

public static final Setting<Integer> SETTING_GRPC_WORKER_COUNT = Setting.intSetting("grpc.worker_count", 1, Setting.Property.NodeScope);

private final SharedGroupFactory sharedGroupFactory;
private final GrpcServiceRegistry grpcServiceRegistry;
private final CopyOnWriteArrayList<Server> servers = new CopyOnWriteArrayList<>();
private volatile SharedGroupFactory.SharedGroup sharedGroup;
private final ServerStatsInterceptor sharedServerStatsInterceptor;
private final AtomicLong currentOpen = new AtomicLong(0);
private final AtomicLong totalOpened = new AtomicLong(0);

public Netty4GrpcServerTransport(Settings settings, NetworkService networkService, SharedGroupFactory sharedGroupFactory, GrpcServiceRegistry grpcServiceRegistry) {
super(settings, networkService);
this.sharedGroupFactory = sharedGroupFactory;
this.sharedServerStatsInterceptor = new ServerStatsInterceptor(currentOpen, totalOpened);
this.grpcServiceRegistry = grpcServiceRegistry;
}

@Override
protected void doStart() {
boolean success = false;
try {
sharedGroup = sharedGroupFactory.getGRPCGroup();
bindServer();
success = true;
logger.info("Started gRPC server on port {}", port);
} finally {
if (!success) {
doStop();
}
}
}

@Override
protected TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRange) {
AtomicReference<Exception> lastException = new AtomicReference<>();
AtomicReference<TransportAddress> addr = new AtomicReference<>();

boolean success = portRange.iterate(portNumber -> {
try {
InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber);
NettyServerBuilder srvBuilder = NettyServerBuilder.forAddress(address)
.bossEventLoopGroup(sharedGroup.getLowLevelGroup())
.workerEventLoopGroup(sharedGroup.getLowLevelGroup())
.channelType(NettyAllocator.getServerChannelType())
.intercept(this.sharedServerStatsInterceptor)
.addService(new HealthStatusManager().getHealthService())
.addService(ProtoReflectionService.newInstance());

for (BindableService bService : grpcServiceRegistry.getServices()) {
srvBuilder.addService(bService);
}

Server srv = srvBuilder.build().start();
servers.add(srv);
addr.set(new TransportAddress(hostAddress, portNumber));
logger.debug("Bound gRPC to address {{}}", address);
return true;
} catch (Exception e) {
lastException.set(e);
return false;
}
});

if (!success) {
throw new RuntimeException("Failed to bind to " + hostAddress + " on ports " + portRange, lastException.get());
}

return addr.get();
}

@Override
protected void doStop() {
for (Server server : servers) {
if (server != null) {
server.shutdown();
try {
server.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while shutting down gRPC server");
} finally {
server.shutdownNow();
}
}

if (sharedGroup != null) {
sharedGroup.shutdown();
sharedGroup = null;
}
}
}

@Override
protected void doClose() {}

@Override
public GrpcStats stats() {
return new GrpcStats(totalOpened.get(), currentOpen.get());
}

static class ServerStatsInterceptor implements ServerInterceptor {
private final AtomicLong currentOpen;
private final AtomicLong totalOpened;

ServerStatsInterceptor(AtomicLong currentOpen, AtomicLong totalOpened) {
this.currentOpen = currentOpen;
this.totalOpened = totalOpened;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next
) {
logger.debug("Intercepted call - Method: {}, Authority: {}, Headers: {}",
call.getMethodDescriptor().getFullMethodName(),
call.getAuthority(),
headers);

currentOpen.incrementAndGet();
totalOpened.incrementAndGet();

return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
currentOpen.decrementAndGet();
super.close(status, trailers);
}
}, headers)
) {
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.grpc.GrpcServerTransport;
import org.opensearch.grpc.netty4.Netty4GrpcServerTransport;
import org.opensearch.grpc.services.GrpcServiceRegistry;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.netty4.Netty4HttpServerTransport;
import org.opensearch.http.netty4.ssl.SecureNetty4HttpServerTransport;
Expand All @@ -68,6 +71,7 @@ public class Netty4ModulePlugin extends Plugin implements NetworkPlugin {
public static final String NETTY_SECURE_TRANSPORT_NAME = "netty4-secure";
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";
public static final String NETTY_SECURE_HTTP_TRANSPORT_NAME = "netty4-secure";
public static final String NETTY_GRPC_TRANSPORT_NAME = "netty4-grpc";

private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();

Expand Down Expand Up @@ -150,6 +154,14 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
);
}

@Override
public Map<String, Supplier<GrpcServerTransport>> getGrpcTransports(Settings settings, NetworkService networkService, GrpcServiceRegistry grpcServiceRegistry) {
return Collections.singletonMap(
NETTY_GRPC_TRANSPORT_NAME,
() -> new Netty4GrpcServerTransport(settings, networkService, getSharedGroupFactory(settings), grpcServiceRegistry)
);
}

@Override
public Map<String, Supplier<HttpServerTransport>> getSecureHttpTransports(
Settings settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.grpc.netty4.Netty4GrpcServerTransport;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.netty4.Netty4HttpServerTransport;
import org.opensearch.transport.netty4.Netty4Transport;
Expand All @@ -62,14 +63,17 @@ public final class SharedGroupFactory {
private final Settings settings;
private final int workerCount;
private final int httpWorkerCount;
private final int grpcWorkerCount;

private RefCountedGroup genericGroup;
private SharedGroup dedicatedHttpGroup;
private SharedGroup dedicatedGRPCGroup;

public SharedGroupFactory(Settings settings) {
this.settings = settings;
this.workerCount = Netty4Transport.WORKER_COUNT.get(settings);
this.httpWorkerCount = Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings);
this.grpcWorkerCount = Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT.get(settings);
}

public Settings getSettings() {
Expand Down Expand Up @@ -99,6 +103,21 @@ public synchronized SharedGroup getHttpGroup() {
}
}

public synchronized SharedGroup getGRPCGroup() {
if (grpcWorkerCount == 0) {
return getGenericGroup();
} else {
if (dedicatedGRPCGroup == null) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(
grpcWorkerCount,
daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)
);
dedicatedGRPCGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup));
}
return dedicatedGRPCGroup;
}
}

private SharedGroup getGenericGroup() {
if (genericGroup == null) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(
Expand Down
Loading
Loading