diff options
Diffstat (limited to 'framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java')
-rw-r--r-- | framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java | 385 |
1 files changed, 385 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java new file mode 100644 index 00000000..4f43fa65 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java @@ -0,0 +1,385 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.stream.Collectors.toList; +import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate; +import static org.onosproject.net.DeviceId.deviceId; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Modified; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.onosproject.grpc.Device.DeviceConnected; +import org.onosproject.grpc.Device.DeviceDisconnected; +import org.onosproject.grpc.Device.DeviceProviderMsg; +import org.onosproject.grpc.Device.DeviceProviderServiceMsg; +import org.onosproject.grpc.Device.IsReachableResponse; +import org.onosproject.grpc.Device.PortStatusChanged; +import org.onosproject.grpc.Device.ReceivedRoleReply; +import org.onosproject.grpc.Device.RegisterProvider; +import org.onosproject.grpc.Device.UpdatePortStatistics; +import org.onosproject.grpc.Device.UpdatePorts; +import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc; +import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc; +import org.onosproject.net.DeviceId; +import org.onosproject.net.MastershipRole; +import org.onosproject.net.device.DeviceProvider; +import org.onosproject.net.device.DeviceProviderRegistry; +import org.onosproject.net.device.DeviceProviderService; +import org.onosproject.net.provider.ProviderId; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Sets; + +import io.grpc.Server; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.StreamObserver; + +// gRPC Server on Metro-side +// Translates request received on RPC channel, and calls corresponding Service on +// Metro-ONOS cluster. +/** + * Server side implementation of gRPC based RemoteService. + */ +@Component(immediate = true) +public class GrpcRemoteServiceServer { + + private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc"; + + // TODO pick a number + public static final int DEFAULT_LISTEN_PORT = 11984; + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceProviderRegistry deviceProviderRegistry; + + + @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT, + label = "Port to listen on") + protected int listenPort = DEFAULT_LISTEN_PORT; + + private Server server; + private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet(); + + @Activate + protected void activate(ComponentContext context) throws IOException { + modified(context); + + log.debug("Server starting on {}", listenPort); + try { + server = NettyServerBuilder.forPort(listenPort) + .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy())) + .build().start(); + } catch (IOException e) { + log.error("Failed to start gRPC server", e); + throw e; + } + + log.info("Started on {}", listenPort); + } + + @Deactivate + protected void deactivate() { + + registeredProviders.stream() + .forEach(deviceProviderRegistry::unregister); + + server.shutdown(); + // Should we wait for shutdown? + log.info("Stopped"); + } + + @Modified + public void modified(ComponentContext context) { + // TODO support dynamic reconfiguration and restarting server? + } + + // RPC Server-side code + // RPC session Factory + /** + * Relays DeviceProviderRegistry calls from RPC client. + */ + class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc { + + @Override + public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) { + log.trace("DeviceProviderRegistryServerProxy#register called!"); + + DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider); + + return new DeviceProviderServiceServerProxy(provider, toDeviceProvider); + } + } + + // Lower -> Upper Controller message + // RPC Server-side code + // RPC session handler + private final class DeviceProviderServiceServerProxy + implements StreamObserver<DeviceProviderServiceMsg> { + + // intentionally shadowing + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final DeviceProviderServerProxy pairedProvider; + private final StreamObserver<DeviceProviderMsg> toDeviceProvider; + + private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable; + + // wrapped providerService + private DeviceProviderService deviceProviderService; + + + DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider, + StreamObserver<DeviceProviderMsg> toDeviceProvider) { + this.pairedProvider = provider; + this.toDeviceProvider = toDeviceProvider; + outstandingIsReachable = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(); + + // pair RPC session in other direction + provider.pair(this); + } + + @Override + public void onNext(DeviceProviderServiceMsg msg) { + try { + log.trace("DeviceProviderServiceServerProxy received: {}", msg); + onMethod(msg); + } catch (Exception e) { + log.error("Exception thrown handling {}", msg, e); + onError(e); + throw e; + } + } + + /** + * Translates received RPC message to {@link DeviceProviderService} method calls. + * @param msg DeviceProviderService message + */ + private void onMethod(DeviceProviderServiceMsg msg) { + switch (msg.getMethodCase()) { + case REGISTER_PROVIDER: + RegisterProvider registerProvider = msg.getRegisterProvider(); + // TODO Do we care about provider name? + pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME)); + registeredProviders.add(pairedProvider); + deviceProviderService = deviceProviderRegistry.register(pairedProvider); + break; + + case DEVICE_CONNECTED: + DeviceConnected deviceConnected = msg.getDeviceConnected(); + deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()), + translate(deviceConnected.getDeviceDescription())); + break; + case DEVICE_DISCONNECTED: + DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected(); + deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId())); + break; + case UPDATE_PORTS: + UpdatePorts updatePorts = msg.getUpdatePorts(); + deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()), + updatePorts.getPortDescriptionsList() + .stream() + .map(GrpcDeviceUtils::translate) + .collect(toList())); + break; + case PORT_STATUS_CHANGED: + PortStatusChanged portStatusChanged = msg.getPortStatusChanged(); + deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()), + translate(portStatusChanged.getPortDescription())); + break; + case RECEIVED_ROLE_REPLY: + ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply(); + deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()), + translate(receivedRoleReply.getRequested()), + translate(receivedRoleReply.getResponse())); + break; + case UPDATE_PORT_STATISTICS: + UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics(); + deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()), + updatePortStatistics.getPortStatisticsList() + .stream() + .map(GrpcDeviceUtils::translate) + .collect(toList())); + break; + + // return value of DeviceProvider#isReachable + case IS_REACHABLE_RESPONSE: + IsReachableResponse isReachableResponse = msg.getIsReachableResponse(); + int xid = isReachableResponse.getXid(); + boolean isReachable = isReachableResponse.getIsReachable(); + CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid); + if (result != null) { + result.complete(isReachable); + } + break; + + case METHOD_NOT_SET: + default: + log.warn("Unexpected message received {}", msg); + break; + } + } + + @Override + public void onCompleted() { + log.info("DeviceProviderServiceServerProxy completed"); + deviceProviderRegistry.unregister(pairedProvider); + registeredProviders.remove(pairedProvider); + toDeviceProvider.onCompleted(); + } + + @Override + public void onError(Throwable e) { + log.error("DeviceProviderServiceServerProxy#onError", e); + deviceProviderRegistry.unregister(pairedProvider); + registeredProviders.remove(pairedProvider); + // TODO What is the proper clean up for bi-di stream on error? + // sample suggests no-op + toDeviceProvider.onError(e); + } + + + /** + * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value. + * @param xid IsReachable call ID. + * @param reply Future to + */ + void register(int xid, CompletableFuture<Boolean> reply) { + outstandingIsReachable.put(xid, reply); + } + + } + + // Upper -> Lower Controller message + /** + * Relay DeviceProvider calls to RPC client. + */ + private final class DeviceProviderServerProxy + implements DeviceProvider { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + // xid for isReachable calls + private final AtomicInteger xidPool = new AtomicInteger(); + private final StreamObserver<DeviceProviderMsg> toDeviceProvider; + + private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null; + private ProviderId providerId; + + DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) { + this.toDeviceProvider = toDeviceProvider; + } + + void setProviderId(ProviderId pid) { + this.providerId = pid; + } + + /** + * Registers RPC stream in other direction. + * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy} + */ + void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) { + this.deviceProviderServiceProxy = deviceProviderServiceProxy; + } + + @Override + public void triggerProbe(DeviceId deviceId) { + log.trace("triggerProbe({})", deviceId); + DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); + msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder() + .setDeviceId(deviceId.toString()) + .build()); + DeviceProviderMsg triggerProbeMsg = msgBuilder.build(); + toDeviceProvider.onNext(triggerProbeMsg); + // TODO Catch Exceptions and call onError() + } + + @Override + public void roleChanged(DeviceId deviceId, MastershipRole newRole) { + log.trace("roleChanged({}, {})", deviceId, newRole); + DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); + msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder() + .setDeviceId(deviceId.toString()) + .setNewRole(translate(newRole)) + .build()); + toDeviceProvider.onNext(msgBuilder.build()); + // TODO Catch Exceptions and call onError() + } + + @Override + public boolean isReachable(DeviceId deviceId) { + log.trace("isReachable({})", deviceId); + CompletableFuture<Boolean> result = new CompletableFuture<>(); + final int xid = xidPool.incrementAndGet(); + + DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); + msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder() + .setXid(xid) + .setDeviceId(deviceId.toString()) + .build()); + + // Associate xid and register above future some where + // in DeviceProviderService channel to receive reply + if (deviceProviderServiceProxy != null) { + deviceProviderServiceProxy.register(xid, result); + } + + // send message down RPC + toDeviceProvider.onNext(msgBuilder.build()); + + // wait for reply + try { + return result.get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.debug("isReachable({}) was Interrupted", deviceId, e); + Thread.currentThread().interrupt(); + } catch (TimeoutException e) { + log.warn("isReachable({}) Timed out", deviceId, e); + } catch (ExecutionException e) { + log.error("isReachable({}) Execution failed", deviceId, e); + // close session? + } + return false; + // TODO Catch Exceptions and call onError() + } + + @Override + public ProviderId id() { + return checkNotNull(providerId, "not initialized yet"); + } + + } +} |