aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java')
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java119
1 files changed, 119 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java
new file mode 100644
index 00000000..74962187
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.incubator.rpc.RemoteServiceContext;
+import org.onosproject.incubator.rpc.RemoteServiceContextProvider;
+import org.onosproject.incubator.rpc.RemoteServiceContextProviderService;
+import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry;
+import org.onosproject.net.provider.ProviderId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
+
+// gRPC Client side
+/**
+ * RemoteServiceContextProvider based on gRPC.
+ */
+@Component(immediate = true)
+public class GrpcRemoteServiceProvider implements RemoteServiceContextProvider {
+
+ public static final String GRPC_SCHEME = "grpc";
+
+ public static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
+
+ private static final ProviderId PID = new ProviderId(GRPC_SCHEME, RPC_PROVIDER_NAME);
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected RemoteServiceProviderRegistry rpcRegistry;
+
+ private Map<URI, ManagedChannel> channels = new ConcurrentHashMap<>();
+
+ private RemoteServiceContextProviderService providerService;
+
+
+ @Activate
+ protected void activate() {
+ providerService = rpcRegistry.register(this);
+
+ // FIXME remove me. test code to see if gRPC loads in karaf
+ //getChannel(URI.create("grpc://localhost:8080"));
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ rpcRegistry.unregister(this);
+
+ // shutdown all channels
+ channels.values().stream()
+ .forEach(ManagedChannel::shutdown);
+ // Should we wait for shutdown? How?
+ channels.clear();
+ log.info("Stopped");
+ }
+
+ @Override
+ public ProviderId id() {
+ return PID;
+ }
+
+ @Override
+ public RemoteServiceContext get(URI uri) {
+ // Create gRPC client
+ return new GrpcRemoteServiceContext(getChannel(uri));
+ }
+
+ private ManagedChannel getChannel(URI uri) {
+ checkArgument(Objects.equals(GRPC_SCHEME, uri.getScheme()),
+ "Invalid URI scheme: %s", uri.getScheme());
+
+ return channels.compute(uri, (u, ch) -> {
+ if (ch != null && !ch.isShutdown()) {
+ return ch;
+ } else {
+ return createChannel(u);
+ }
+ });
+ }
+
+ private ManagedChannel createChannel(URI uri) {
+ log.debug("Creating channel for {}", uri);
+ return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort())
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .build();
+ }
+
+}