aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java')
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java385
1 files changed, 0 insertions, 385 deletions
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
deleted file mode 100644
index 4f43fa65..00000000
--- a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.incubator.rpc.grpc;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.stream.Collectors.toList;
-import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
-import static org.onosproject.net.DeviceId.deviceId;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.grpc.Device.DeviceConnected;
-import org.onosproject.grpc.Device.DeviceDisconnected;
-import org.onosproject.grpc.Device.DeviceProviderMsg;
-import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
-import org.onosproject.grpc.Device.IsReachableResponse;
-import org.onosproject.grpc.Device.PortStatusChanged;
-import org.onosproject.grpc.Device.ReceivedRoleReply;
-import org.onosproject.grpc.Device.RegisterProvider;
-import org.onosproject.grpc.Device.UpdatePortStatistics;
-import org.onosproject.grpc.Device.UpdatePorts;
-import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
-import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.DeviceProvider;
-import org.onosproject.net.device.DeviceProviderRegistry;
-import org.onosproject.net.device.DeviceProviderService;
-import org.onosproject.net.provider.ProviderId;
-import org.osgi.service.component.ComponentContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Sets;
-
-import io.grpc.Server;
-import io.grpc.netty.NettyServerBuilder;
-import io.grpc.stub.StreamObserver;
-
-// gRPC Server on Metro-side
-// Translates request received on RPC channel, and calls corresponding Service on
-// Metro-ONOS cluster.
-/**
- * Server side implementation of gRPC based RemoteService.
- */
-@Component(immediate = true)
-public class GrpcRemoteServiceServer {
-
- private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
-
- // TODO pick a number
- public static final int DEFAULT_LISTEN_PORT = 11984;
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceProviderRegistry deviceProviderRegistry;
-
-
- @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
- label = "Port to listen on")
- protected int listenPort = DEFAULT_LISTEN_PORT;
-
- private Server server;
- private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
-
- @Activate
- protected void activate(ComponentContext context) throws IOException {
- modified(context);
-
- log.debug("Server starting on {}", listenPort);
- try {
- server = NettyServerBuilder.forPort(listenPort)
- .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
- .build().start();
- } catch (IOException e) {
- log.error("Failed to start gRPC server", e);
- throw e;
- }
-
- log.info("Started on {}", listenPort);
- }
-
- @Deactivate
- protected void deactivate() {
-
- registeredProviders.stream()
- .forEach(deviceProviderRegistry::unregister);
-
- server.shutdown();
- // Should we wait for shutdown?
- log.info("Stopped");
- }
-
- @Modified
- public void modified(ComponentContext context) {
- // TODO support dynamic reconfiguration and restarting server?
- }
-
- // RPC Server-side code
- // RPC session Factory
- /**
- * Relays DeviceProviderRegistry calls from RPC client.
- */
- class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
-
- @Override
- public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
- log.trace("DeviceProviderRegistryServerProxy#register called!");
-
- DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
-
- return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
- }
- }
-
- // Lower -> Upper Controller message
- // RPC Server-side code
- // RPC session handler
- private final class DeviceProviderServiceServerProxy
- implements StreamObserver<DeviceProviderServiceMsg> {
-
- // intentionally shadowing
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final DeviceProviderServerProxy pairedProvider;
- private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
-
- private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
-
- // wrapped providerService
- private DeviceProviderService deviceProviderService;
-
-
- DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
- StreamObserver<DeviceProviderMsg> toDeviceProvider) {
- this.pairedProvider = provider;
- this.toDeviceProvider = toDeviceProvider;
- outstandingIsReachable = CacheBuilder.newBuilder()
- .expireAfterWrite(1, TimeUnit.MINUTES)
- .build();
-
- // pair RPC session in other direction
- provider.pair(this);
- }
-
- @Override
- public void onNext(DeviceProviderServiceMsg msg) {
- try {
- log.trace("DeviceProviderServiceServerProxy received: {}", msg);
- onMethod(msg);
- } catch (Exception e) {
- log.error("Exception thrown handling {}", msg, e);
- onError(e);
- throw e;
- }
- }
-
- /**
- * Translates received RPC message to {@link DeviceProviderService} method calls.
- * @param msg DeviceProviderService message
- */
- private void onMethod(DeviceProviderServiceMsg msg) {
- switch (msg.getMethodCase()) {
- case REGISTER_PROVIDER:
- RegisterProvider registerProvider = msg.getRegisterProvider();
- // TODO Do we care about provider name?
- pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
- registeredProviders.add(pairedProvider);
- deviceProviderService = deviceProviderRegistry.register(pairedProvider);
- break;
-
- case DEVICE_CONNECTED:
- DeviceConnected deviceConnected = msg.getDeviceConnected();
- deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
- translate(deviceConnected.getDeviceDescription()));
- break;
- case DEVICE_DISCONNECTED:
- DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
- deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
- break;
- case UPDATE_PORTS:
- UpdatePorts updatePorts = msg.getUpdatePorts();
- deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
- updatePorts.getPortDescriptionsList()
- .stream()
- .map(GrpcDeviceUtils::translate)
- .collect(toList()));
- break;
- case PORT_STATUS_CHANGED:
- PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
- deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
- translate(portStatusChanged.getPortDescription()));
- break;
- case RECEIVED_ROLE_REPLY:
- ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
- deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
- translate(receivedRoleReply.getRequested()),
- translate(receivedRoleReply.getResponse()));
- break;
- case UPDATE_PORT_STATISTICS:
- UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
- deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
- updatePortStatistics.getPortStatisticsList()
- .stream()
- .map(GrpcDeviceUtils::translate)
- .collect(toList()));
- break;
-
- // return value of DeviceProvider#isReachable
- case IS_REACHABLE_RESPONSE:
- IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
- int xid = isReachableResponse.getXid();
- boolean isReachable = isReachableResponse.getIsReachable();
- CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
- if (result != null) {
- result.complete(isReachable);
- }
- break;
-
- case METHOD_NOT_SET:
- default:
- log.warn("Unexpected message received {}", msg);
- break;
- }
- }
-
- @Override
- public void onCompleted() {
- log.info("DeviceProviderServiceServerProxy completed");
- deviceProviderRegistry.unregister(pairedProvider);
- registeredProviders.remove(pairedProvider);
- toDeviceProvider.onCompleted();
- }
-
- @Override
- public void onError(Throwable e) {
- log.error("DeviceProviderServiceServerProxy#onError", e);
- deviceProviderRegistry.unregister(pairedProvider);
- registeredProviders.remove(pairedProvider);
- // TODO What is the proper clean up for bi-di stream on error?
- // sample suggests no-op
- toDeviceProvider.onError(e);
- }
-
-
- /**
- * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
- * @param xid IsReachable call ID.
- * @param reply Future to
- */
- void register(int xid, CompletableFuture<Boolean> reply) {
- outstandingIsReachable.put(xid, reply);
- }
-
- }
-
- // Upper -> Lower Controller message
- /**
- * Relay DeviceProvider calls to RPC client.
- */
- private final class DeviceProviderServerProxy
- implements DeviceProvider {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- // xid for isReachable calls
- private final AtomicInteger xidPool = new AtomicInteger();
- private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
-
- private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
- private ProviderId providerId;
-
- DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
- this.toDeviceProvider = toDeviceProvider;
- }
-
- void setProviderId(ProviderId pid) {
- this.providerId = pid;
- }
-
- /**
- * Registers RPC stream in other direction.
- * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
- */
- void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
- this.deviceProviderServiceProxy = deviceProviderServiceProxy;
- }
-
- @Override
- public void triggerProbe(DeviceId deviceId) {
- log.trace("triggerProbe({})", deviceId);
- DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
- msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
- .setDeviceId(deviceId.toString())
- .build());
- DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
- toDeviceProvider.onNext(triggerProbeMsg);
- // TODO Catch Exceptions and call onError()
- }
-
- @Override
- public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
- log.trace("roleChanged({}, {})", deviceId, newRole);
- DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
- msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
- .setDeviceId(deviceId.toString())
- .setNewRole(translate(newRole))
- .build());
- toDeviceProvider.onNext(msgBuilder.build());
- // TODO Catch Exceptions and call onError()
- }
-
- @Override
- public boolean isReachable(DeviceId deviceId) {
- log.trace("isReachable({})", deviceId);
- CompletableFuture<Boolean> result = new CompletableFuture<>();
- final int xid = xidPool.incrementAndGet();
-
- DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
- msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
- .setXid(xid)
- .setDeviceId(deviceId.toString())
- .build());
-
- // Associate xid and register above future some where
- // in DeviceProviderService channel to receive reply
- if (deviceProviderServiceProxy != null) {
- deviceProviderServiceProxy.register(xid, result);
- }
-
- // send message down RPC
- toDeviceProvider.onNext(msgBuilder.build());
-
- // wait for reply
- try {
- return result.get(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.debug("isReachable({}) was Interrupted", deviceId, e);
- Thread.currentThread().interrupt();
- } catch (TimeoutException e) {
- log.warn("isReachable({}) Timed out", deviceId, e);
- } catch (ExecutionException e) {
- log.error("isReachable({}) Execution failed", deviceId, e);
- // close session?
- }
- return false;
- // TODO Catch Exceptions and call onError()
- }
-
- @Override
- public ProviderId id() {
- return checkNotNull(providerId, "not initialized yet");
- }
-
- }
-}