/* * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onosproject.incubator.rpc.grpc; import static com.google.common.base.Preconditions.checkNotNull; import static java.util.stream.Collectors.toList; import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate; import static org.onosproject.net.DeviceId.deviceId; import java.io.IOException; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Modified; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.onosproject.grpc.Device.DeviceConnected; import org.onosproject.grpc.Device.DeviceDisconnected; import org.onosproject.grpc.Device.DeviceProviderMsg; import org.onosproject.grpc.Device.DeviceProviderServiceMsg; import org.onosproject.grpc.Device.IsReachableResponse; import org.onosproject.grpc.Device.PortStatusChanged; import org.onosproject.grpc.Device.ReceivedRoleReply; import org.onosproject.grpc.Device.RegisterProvider; import org.onosproject.grpc.Device.UpdatePortStatistics; import org.onosproject.grpc.Device.UpdatePorts; import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc; import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc; import org.onosproject.net.DeviceId; import org.onosproject.net.MastershipRole; import org.onosproject.net.device.DeviceProvider; import org.onosproject.net.device.DeviceProviderRegistry; import org.onosproject.net.device.DeviceProviderService; import org.onosproject.net.provider.ProviderId; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Sets; import io.grpc.Server; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; // gRPC Server on Metro-side // Translates request received on RPC channel, and calls corresponding Service on // Metro-ONOS cluster. /** * Server side implementation of gRPC based RemoteService. */ @Component(immediate = true) public class GrpcRemoteServiceServer { private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc"; // TODO pick a number public static final int DEFAULT_LISTEN_PORT = 11984; private final Logger log = LoggerFactory.getLogger(getClass()); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DeviceProviderRegistry deviceProviderRegistry; @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT, label = "Port to listen on") protected int listenPort = DEFAULT_LISTEN_PORT; private Server server; private final Set registeredProviders = Sets.newConcurrentHashSet(); @Activate protected void activate(ComponentContext context) throws IOException { modified(context); log.debug("Server starting on {}", listenPort); try { server = NettyServerBuilder.forPort(listenPort) .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy())) .build().start(); } catch (IOException e) { log.error("Failed to start gRPC server", e); throw e; } log.info("Started on {}", listenPort); } @Deactivate protected void deactivate() { registeredProviders.stream() .forEach(deviceProviderRegistry::unregister); server.shutdown(); // Should we wait for shutdown? log.info("Stopped"); } @Modified public void modified(ComponentContext context) { // TODO support dynamic reconfiguration and restarting server? } // RPC Server-side code // RPC session Factory /** * Relays DeviceProviderRegistry calls from RPC client. */ class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc { @Override public StreamObserver register(StreamObserver toDeviceProvider) { log.trace("DeviceProviderRegistryServerProxy#register called!"); DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider); return new DeviceProviderServiceServerProxy(provider, toDeviceProvider); } } // Lower -> Upper Controller message // RPC Server-side code // RPC session handler private final class DeviceProviderServiceServerProxy implements StreamObserver { // intentionally shadowing private final Logger log = LoggerFactory.getLogger(getClass()); private final DeviceProviderServerProxy pairedProvider; private final StreamObserver toDeviceProvider; private final Cache> outstandingIsReachable; // wrapped providerService private DeviceProviderService deviceProviderService; DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider, StreamObserver toDeviceProvider) { this.pairedProvider = provider; this.toDeviceProvider = toDeviceProvider; outstandingIsReachable = CacheBuilder.newBuilder() .expireAfterWrite(1, TimeUnit.MINUTES) .build(); // pair RPC session in other direction provider.pair(this); } @Override public void onNext(DeviceProviderServiceMsg msg) { try { log.trace("DeviceProviderServiceServerProxy received: {}", msg); onMethod(msg); } catch (Exception e) { log.error("Exception thrown handling {}", msg, e); onError(e); throw e; } } /** * Translates received RPC message to {@link DeviceProviderService} method calls. * @param msg DeviceProviderService message */ private void onMethod(DeviceProviderServiceMsg msg) { switch (msg.getMethodCase()) { case REGISTER_PROVIDER: RegisterProvider registerProvider = msg.getRegisterProvider(); // TODO Do we care about provider name? pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME)); registeredProviders.add(pairedProvider); deviceProviderService = deviceProviderRegistry.register(pairedProvider); break; case DEVICE_CONNECTED: DeviceConnected deviceConnected = msg.getDeviceConnected(); deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()), translate(deviceConnected.getDeviceDescription())); break; case DEVICE_DISCONNECTED: DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected(); deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId())); break; case UPDATE_PORTS: UpdatePorts updatePorts = msg.getUpdatePorts(); deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()), updatePorts.getPortDescriptionsList() .stream() .map(GrpcDeviceUtils::translate) .collect(toList())); break; case PORT_STATUS_CHANGED: PortStatusChanged portStatusChanged = msg.getPortStatusChanged(); deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()), translate(portStatusChanged.getPortDescription())); break; case RECEIVED_ROLE_REPLY: ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply(); deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()), translate(receivedRoleReply.getRequested()), translate(receivedRoleReply.getResponse())); break; case UPDATE_PORT_STATISTICS: UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics(); deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()), updatePortStatistics.getPortStatisticsList() .stream() .map(GrpcDeviceUtils::translate) .collect(toList())); break; // return value of DeviceProvider#isReachable case IS_REACHABLE_RESPONSE: IsReachableResponse isReachableResponse = msg.getIsReachableResponse(); int xid = isReachableResponse.getXid(); boolean isReachable = isReachableResponse.getIsReachable(); CompletableFuture result = outstandingIsReachable.asMap().remove(xid); if (result != null) { result.complete(isReachable); } break; case METHOD_NOT_SET: default: log.warn("Unexpected message received {}", msg); break; } } @Override public void onCompleted() { log.info("DeviceProviderServiceServerProxy completed"); deviceProviderRegistry.unregister(pairedProvider); registeredProviders.remove(pairedProvider); toDeviceProvider.onCompleted(); } @Override public void onError(Throwable e) { log.error("DeviceProviderServiceServerProxy#onError", e); deviceProviderRegistry.unregister(pairedProvider); registeredProviders.remove(pairedProvider); // TODO What is the proper clean up for bi-di stream on error? // sample suggests no-op toDeviceProvider.onError(e); } /** * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value. * @param xid IsReachable call ID. * @param reply Future to */ void register(int xid, CompletableFuture reply) { outstandingIsReachable.put(xid, reply); } } // Upper -> Lower Controller message /** * Relay DeviceProvider calls to RPC client. */ private final class DeviceProviderServerProxy implements DeviceProvider { private final Logger log = LoggerFactory.getLogger(getClass()); // xid for isReachable calls private final AtomicInteger xidPool = new AtomicInteger(); private final StreamObserver toDeviceProvider; private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null; private ProviderId providerId; DeviceProviderServerProxy(StreamObserver toDeviceProvider) { this.toDeviceProvider = toDeviceProvider; } void setProviderId(ProviderId pid) { this.providerId = pid; } /** * Registers RPC stream in other direction. * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy} */ void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) { this.deviceProviderServiceProxy = deviceProviderServiceProxy; } @Override public void triggerProbe(DeviceId deviceId) { log.trace("triggerProbe({})", deviceId); DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder() .setDeviceId(deviceId.toString()) .build()); DeviceProviderMsg triggerProbeMsg = msgBuilder.build(); toDeviceProvider.onNext(triggerProbeMsg); // TODO Catch Exceptions and call onError() } @Override public void roleChanged(DeviceId deviceId, MastershipRole newRole) { log.trace("roleChanged({}, {})", deviceId, newRole); DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder() .setDeviceId(deviceId.toString()) .setNewRole(translate(newRole)) .build()); toDeviceProvider.onNext(msgBuilder.build()); // TODO Catch Exceptions and call onError() } @Override public boolean isReachable(DeviceId deviceId) { log.trace("isReachable({})", deviceId); CompletableFuture result = new CompletableFuture<>(); final int xid = xidPool.incrementAndGet(); DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder() .setXid(xid) .setDeviceId(deviceId.toString()) .build()); // Associate xid and register above future some where // in DeviceProviderService channel to receive reply if (deviceProviderServiceProxy != null) { deviceProviderServiceProxy.register(xid, result); } // send message down RPC toDeviceProvider.onNext(msgBuilder.build()); // wait for reply try { return result.get(10, TimeUnit.SECONDS); } catch (InterruptedException e) { log.debug("isReachable({}) was Interrupted", deviceId, e); Thread.currentThread().interrupt(); } catch (TimeoutException e) { log.warn("isReachable({}) Timed out", deviceId, e); } catch (ExecutionException e) { log.error("isReachable({}) Execution failed", deviceId, e); // close session? } return false; // TODO Catch Exceptions and call onError() } @Override public ProviderId id() { return checkNotNull(providerId, "not initialized yet"); } } }