diff options
Diffstat (limited to 'framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java')
-rw-r--r-- | framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java new file mode 100644 index 00000000..74962187 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java @@ -0,0 +1,119 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.net.URI; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.onosproject.incubator.rpc.RemoteServiceContext; +import org.onosproject.incubator.rpc.RemoteServiceContextProvider; +import org.onosproject.incubator.rpc.RemoteServiceContextProviderService; +import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry; +import org.onosproject.net.provider.ProviderId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.ManagedChannel; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; + + +// gRPC Client side +/** + * RemoteServiceContextProvider based on gRPC. + */ +@Component(immediate = true) +public class GrpcRemoteServiceProvider implements RemoteServiceContextProvider { + + public static final String GRPC_SCHEME = "grpc"; + + public static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc"; + + private static final ProviderId PID = new ProviderId(GRPC_SCHEME, RPC_PROVIDER_NAME); + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected RemoteServiceProviderRegistry rpcRegistry; + + private Map<URI, ManagedChannel> channels = new ConcurrentHashMap<>(); + + private RemoteServiceContextProviderService providerService; + + + @Activate + protected void activate() { + providerService = rpcRegistry.register(this); + + // FIXME remove me. test code to see if gRPC loads in karaf + //getChannel(URI.create("grpc://localhost:8080")); + + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + rpcRegistry.unregister(this); + + // shutdown all channels + channels.values().stream() + .forEach(ManagedChannel::shutdown); + // Should we wait for shutdown? How? + channels.clear(); + log.info("Stopped"); + } + + @Override + public ProviderId id() { + return PID; + } + + @Override + public RemoteServiceContext get(URI uri) { + // Create gRPC client + return new GrpcRemoteServiceContext(getChannel(uri)); + } + + private ManagedChannel getChannel(URI uri) { + checkArgument(Objects.equals(GRPC_SCHEME, uri.getScheme()), + "Invalid URI scheme: %s", uri.getScheme()); + + return channels.compute(uri, (u, ch) -> { + if (ch != null && !ch.isShutdown()) { + return ch; + } else { + return createChannel(u); + } + }); + } + + private ManagedChannel createChannel(URI uri) { + log.debug("Creating channel for {}", uri); + return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()) + .negotiationType(NegotiationType.PLAINTEXT) + .build(); + } + +} |