aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java')
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java385
1 files changed, 385 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
new file mode 100644
index 00000000..4f43fa65
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
@@ -0,0 +1,385 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.stream.Collectors.toList;
+import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
+import static org.onosproject.net.DeviceId.deviceId;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.grpc.Device.DeviceConnected;
+import org.onosproject.grpc.Device.DeviceDisconnected;
+import org.onosproject.grpc.Device.DeviceProviderMsg;
+import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
+import org.onosproject.grpc.Device.IsReachableResponse;
+import org.onosproject.grpc.Device.PortStatusChanged;
+import org.onosproject.grpc.Device.ReceivedRoleReply;
+import org.onosproject.grpc.Device.RegisterProvider;
+import org.onosproject.grpc.Device.UpdatePortStatistics;
+import org.onosproject.grpc.Device.UpdatePorts;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.provider.ProviderId;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import io.grpc.stub.StreamObserver;
+
+// gRPC Server on Metro-side
+// Translates request received on RPC channel, and calls corresponding Service on
+// Metro-ONOS cluster.
+/**
+ * Server side implementation of gRPC based RemoteService.
+ */
+@Component(immediate = true)
+public class GrpcRemoteServiceServer {
+
+ private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
+
+ // TODO pick a number
+ public static final int DEFAULT_LISTEN_PORT = 11984;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceProviderRegistry deviceProviderRegistry;
+
+
+ @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
+ label = "Port to listen on")
+ protected int listenPort = DEFAULT_LISTEN_PORT;
+
+ private Server server;
+ private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
+
+ @Activate
+ protected void activate(ComponentContext context) throws IOException {
+ modified(context);
+
+ log.debug("Server starting on {}", listenPort);
+ try {
+ server = NettyServerBuilder.forPort(listenPort)
+ .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
+ .build().start();
+ } catch (IOException e) {
+ log.error("Failed to start gRPC server", e);
+ throw e;
+ }
+
+ log.info("Started on {}", listenPort);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+
+ registeredProviders.stream()
+ .forEach(deviceProviderRegistry::unregister);
+
+ server.shutdown();
+ // Should we wait for shutdown?
+ log.info("Stopped");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ // TODO support dynamic reconfiguration and restarting server?
+ }
+
+ // RPC Server-side code
+ // RPC session Factory
+ /**
+ * Relays DeviceProviderRegistry calls from RPC client.
+ */
+ class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
+
+ @Override
+ public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ log.trace("DeviceProviderRegistryServerProxy#register called!");
+
+ DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
+
+ return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
+ }
+ }
+
+ // Lower -> Upper Controller message
+ // RPC Server-side code
+ // RPC session handler
+ private final class DeviceProviderServiceServerProxy
+ implements StreamObserver<DeviceProviderServiceMsg> {
+
+ // intentionally shadowing
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final DeviceProviderServerProxy pairedProvider;
+ private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
+
+ private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
+
+ // wrapped providerService
+ private DeviceProviderService deviceProviderService;
+
+
+ DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
+ StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ this.pairedProvider = provider;
+ this.toDeviceProvider = toDeviceProvider;
+ outstandingIsReachable = CacheBuilder.newBuilder()
+ .expireAfterWrite(1, TimeUnit.MINUTES)
+ .build();
+
+ // pair RPC session in other direction
+ provider.pair(this);
+ }
+
+ @Override
+ public void onNext(DeviceProviderServiceMsg msg) {
+ try {
+ log.trace("DeviceProviderServiceServerProxy received: {}", msg);
+ onMethod(msg);
+ } catch (Exception e) {
+ log.error("Exception thrown handling {}", msg, e);
+ onError(e);
+ throw e;
+ }
+ }
+
+ /**
+ * Translates received RPC message to {@link DeviceProviderService} method calls.
+ * @param msg DeviceProviderService message
+ */
+ private void onMethod(DeviceProviderServiceMsg msg) {
+ switch (msg.getMethodCase()) {
+ case REGISTER_PROVIDER:
+ RegisterProvider registerProvider = msg.getRegisterProvider();
+ // TODO Do we care about provider name?
+ pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
+ registeredProviders.add(pairedProvider);
+ deviceProviderService = deviceProviderRegistry.register(pairedProvider);
+ break;
+
+ case DEVICE_CONNECTED:
+ DeviceConnected deviceConnected = msg.getDeviceConnected();
+ deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
+ translate(deviceConnected.getDeviceDescription()));
+ break;
+ case DEVICE_DISCONNECTED:
+ DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
+ deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
+ break;
+ case UPDATE_PORTS:
+ UpdatePorts updatePorts = msg.getUpdatePorts();
+ deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
+ updatePorts.getPortDescriptionsList()
+ .stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList()));
+ break;
+ case PORT_STATUS_CHANGED:
+ PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
+ deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
+ translate(portStatusChanged.getPortDescription()));
+ break;
+ case RECEIVED_ROLE_REPLY:
+ ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
+ deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
+ translate(receivedRoleReply.getRequested()),
+ translate(receivedRoleReply.getResponse()));
+ break;
+ case UPDATE_PORT_STATISTICS:
+ UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
+ deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
+ updatePortStatistics.getPortStatisticsList()
+ .stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList()));
+ break;
+
+ // return value of DeviceProvider#isReachable
+ case IS_REACHABLE_RESPONSE:
+ IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
+ int xid = isReachableResponse.getXid();
+ boolean isReachable = isReachableResponse.getIsReachable();
+ CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
+ if (result != null) {
+ result.complete(isReachable);
+ }
+ break;
+
+ case METHOD_NOT_SET:
+ default:
+ log.warn("Unexpected message received {}", msg);
+ break;
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("DeviceProviderServiceServerProxy completed");
+ deviceProviderRegistry.unregister(pairedProvider);
+ registeredProviders.remove(pairedProvider);
+ toDeviceProvider.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ log.error("DeviceProviderServiceServerProxy#onError", e);
+ deviceProviderRegistry.unregister(pairedProvider);
+ registeredProviders.remove(pairedProvider);
+ // TODO What is the proper clean up for bi-di stream on error?
+ // sample suggests no-op
+ toDeviceProvider.onError(e);
+ }
+
+
+ /**
+ * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
+ * @param xid IsReachable call ID.
+ * @param reply Future to
+ */
+ void register(int xid, CompletableFuture<Boolean> reply) {
+ outstandingIsReachable.put(xid, reply);
+ }
+
+ }
+
+ // Upper -> Lower Controller message
+ /**
+ * Relay DeviceProvider calls to RPC client.
+ */
+ private final class DeviceProviderServerProxy
+ implements DeviceProvider {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ // xid for isReachable calls
+ private final AtomicInteger xidPool = new AtomicInteger();
+ private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
+
+ private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
+ private ProviderId providerId;
+
+ DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ this.toDeviceProvider = toDeviceProvider;
+ }
+
+ void setProviderId(ProviderId pid) {
+ this.providerId = pid;
+ }
+
+ /**
+ * Registers RPC stream in other direction.
+ * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
+ */
+ void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
+ this.deviceProviderServiceProxy = deviceProviderServiceProxy;
+ }
+
+ @Override
+ public void triggerProbe(DeviceId deviceId) {
+ log.trace("triggerProbe({})", deviceId);
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
+ .setDeviceId(deviceId.toString())
+ .build());
+ DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
+ toDeviceProvider.onNext(triggerProbeMsg);
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+ log.trace("roleChanged({}, {})", deviceId, newRole);
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
+ .setDeviceId(deviceId.toString())
+ .setNewRole(translate(newRole))
+ .build());
+ toDeviceProvider.onNext(msgBuilder.build());
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public boolean isReachable(DeviceId deviceId) {
+ log.trace("isReachable({})", deviceId);
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ final int xid = xidPool.incrementAndGet();
+
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
+ .setXid(xid)
+ .setDeviceId(deviceId.toString())
+ .build());
+
+ // Associate xid and register above future some where
+ // in DeviceProviderService channel to receive reply
+ if (deviceProviderServiceProxy != null) {
+ deviceProviderServiceProxy.register(xid, result);
+ }
+
+ // send message down RPC
+ toDeviceProvider.onNext(msgBuilder.build());
+
+ // wait for reply
+ try {
+ return result.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.debug("isReachable({}) was Interrupted", deviceId, e);
+ Thread.currentThread().interrupt();
+ } catch (TimeoutException e) {
+ log.warn("isReachable({}) Timed out", deviceId, e);
+ } catch (ExecutionException e) {
+ log.error("isReachable({}) Execution failed", deviceId, e);
+ // close session?
+ }
+ return false;
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public ProviderId id() {
+ return checkNotNull(providerId, "not initialized yet");
+ }
+
+ }
+}