aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject')
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java77
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java295
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java381
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java75
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java119
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java385
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java20
7 files changed, 1352 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java
new file mode 100644
index 00000000..cad0fbb6
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import java.util.Map;
+
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.provider.AbstractProviderRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import io.grpc.Channel;
+import io.grpc.ManagedChannel;
+
+// gRPC Client side
+/**
+ * Proxy object to handle DeviceProviderRegistry calls.
+ *
+ * RPC wise, this will start/stop bidirectional streaming service sessions.
+ */
+final class DeviceProviderRegistryClientProxy
+ extends AbstractProviderRegistry<DeviceProvider, DeviceProviderService>
+ implements DeviceProviderRegistry {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final Channel channel;
+
+ private final Map<DeviceProvider, DeviceProviderServiceClientProxy> pServices;
+
+ DeviceProviderRegistryClientProxy(ManagedChannel channel) {
+ this.channel = channel;
+ pServices = Maps.newIdentityHashMap();
+ }
+
+ @Override
+ protected synchronized DeviceProviderService createProviderService(DeviceProvider provider) {
+
+ // Create session
+ DeviceProviderServiceClientProxy pService = new DeviceProviderServiceClientProxy(provider, channel);
+ log.debug("Created DeviceProviderServiceClientProxy", pService);
+
+ DeviceProviderServiceClientProxy old = pServices.put(provider, pService);
+ if (old != null) {
+ // sanity check, can go away
+ log.warn("Duplicate registration detected for {}", provider.id());
+ }
+ return pService;
+ }
+
+ @Override
+ public synchronized void unregister(DeviceProvider provider) {
+ DeviceProviderServiceClientProxy pService = pServices.remove(provider);
+ log.debug("Unregistering DeviceProviderServiceClientProxy", pService);
+ super.unregister(provider);
+ if (pService != null) {
+ pService.shutdown();
+ }
+ }
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java
new file mode 100644
index 00000000..498011f2
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.stream.Collectors.toList;
+import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
+import static org.onosproject.net.DeviceId.deviceId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.onosproject.grpc.Device.DeviceProviderMsg;
+import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
+import org.onosproject.grpc.Device.IsReachableRequest;
+import org.onosproject.grpc.Device.RoleChanged;
+import org.onosproject.grpc.Device.TriggerProbe;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.provider.AbstractProviderService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+
+import io.grpc.Channel;
+import io.grpc.stub.StreamObserver;
+
+// gRPC Client side
+// gRPC wise, this object represents bidirectional streaming service session
+// and deals with outgoing message stream
+/**
+ * DeviceProviderService instance associated with given DeviceProvider.
+ */
+final class DeviceProviderServiceClientProxy
+ extends AbstractProviderService<DeviceProvider>
+ implements DeviceProviderService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final StreamObserver<DeviceProviderServiceMsg> devProvService;
+ private final AtomicBoolean hasShutdown = new AtomicBoolean(false);
+
+ private final Channel channel;
+
+ DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) {
+ super(provider);
+ this.channel = channel;
+
+ DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel);
+ log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority());
+ devProvService = stub.register(new DeviceProviderClientProxy(provider));
+
+ // send initialize message
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setRegisterProvider(builder.getRegisterProviderBuilder()
+ .setProviderScheme(provider.id().scheme())
+ .build());
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void deviceConnected(DeviceId deviceId,
+ DeviceDescription deviceDescription) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setDeviceConnected(builder.getDeviceConnectedBuilder()
+ .setDeviceId(deviceId.toString())
+ .setDeviceDescription(translate(deviceDescription))
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void deviceDisconnected(DeviceId deviceId) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder()
+ .setDeviceId(deviceId.toString())
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void updatePorts(DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ List<org.onosproject.grpc.Port.PortDescription> portDescs =
+ portDescriptions.stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList());
+
+ builder.setUpdatePorts(builder.getUpdatePortsBuilder()
+ .setDeviceId(deviceId.toString())
+ .addAllPortDescriptions(portDescs)
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void portStatusChanged(DeviceId deviceId,
+ PortDescription portDescription) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setPortStatusChanged(builder.getPortStatusChangedBuilder()
+ .setDeviceId(deviceId.toString())
+ .setPortDescription(translate(portDescription))
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
+ MastershipRole response) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder()
+ .setDeviceId(deviceId.toString())
+ .setRequested(translate(requested))
+ .setResponse(translate(response))
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void updatePortStatistics(DeviceId deviceId,
+ Collection<PortStatistics> portStatistics) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ List<org.onosproject.grpc.Port.PortStatistics> portStats =
+ portStatistics.stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList());
+ builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder()
+ .setDeviceId(deviceId.toString())
+ .addAllPortStatistics(portStats)
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ /**
+ * Shutdown this session.
+ */
+ public void shutdown() {
+ if (hasShutdown.compareAndSet(false, true)) {
+ log.info("Shutting down session over {}", channel.authority());
+ // initiate clean shutdown from client
+ devProvService.onCompleted();
+ invalidate();
+ }
+ }
+
+ /**
+ * Abnormally terminate this session.
+ * @param t error details
+ */
+ public void shutdown(Throwable t) {
+ if (hasShutdown.compareAndSet(false, true)) {
+ log.error("Shutting down session over {}", channel.authority());
+ // initiate abnormal termination from client
+ devProvService.onError(t);
+ invalidate();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("channel", channel.authority())
+ .add("hasShutdown", hasShutdown.get())
+ .toString();
+ }
+
+ // gRPC wise, this object handles incoming message stream
+ /**
+ * Translates DeviceProvider instructions received from RPC to Java calls.
+ */
+ private final class DeviceProviderClientProxy
+ implements StreamObserver<DeviceProviderMsg> {
+
+ private final DeviceProvider provider;
+
+ DeviceProviderClientProxy(DeviceProvider provider) {
+ this.provider = checkNotNull(provider);
+ }
+
+ @Override
+ public void onNext(DeviceProviderMsg msg) {
+ try {
+ log.trace("DeviceProviderClientProxy received: {}", msg);
+ onMethod(msg);
+ } catch (Exception e) {
+ log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e);
+ // initiate shutdown from client
+ shutdown(e);
+ }
+ }
+
+ /**
+ * Translates received RPC message to {@link DeviceProvider} method calls.
+ * @param msg DeviceProvider message
+ */
+ private void onMethod(DeviceProviderMsg msg) {
+ switch (msg.getMethodCase()) {
+ case TRIGGER_PROBE:
+ TriggerProbe triggerProbe = msg.getTriggerProbe();
+ provider.triggerProbe(deviceId(triggerProbe.getDeviceId()));
+ break;
+ case ROLE_CHANGED:
+ RoleChanged roleChanged = msg.getRoleChanged();
+ provider.roleChanged(deviceId(roleChanged.getDeviceId()),
+ translate(roleChanged.getNewRole()));
+ break;
+ case IS_REACHABLE_REQUEST:
+ IsReachableRequest isReachableRequest = msg.getIsReachableRequest();
+ // check if reachable
+ boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId()));
+
+ int xid = isReachableRequest.getXid();
+ // send response back DeviceProviderService channel
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setIsReachableResponse(builder.getIsReachableResponseBuilder()
+ .setXid(xid)
+ .setIsReachable(reachable)
+ .build());
+ devProvService.onNext(builder.build());
+ break;
+
+ case METHOD_NOT_SET:
+ default:
+ log.warn("Unexpected method, ignoring", msg);
+ break;
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("DeviceProviderClientProxy completed");
+ // session terminated from remote
+ // TODO unregister...? how?
+
+ //devProvService.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("DeviceProviderClientProxy#onError", t);
+ // session terminated from remote
+ // TODO unregister...? how?
+ //devProvService.onError(t);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("channel", channel.authority())
+ .toString();
+ }
+ }
+}
+
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java
new file mode 100644
index 00000000..7045b0c2
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onlab.packet.ChassisId;
+import org.onosproject.grpc.Device.DeviceType;
+import org.onosproject.grpc.Port.PortType;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.Port;
+import org.onosproject.net.Port.Type;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DefaultPortDescription;
+import org.onosproject.net.device.DefaultPortStatistics;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.api.client.repackaged.com.google.common.annotations.Beta;
+
+/**
+ * gRPC message conversion related utilities.
+ */
+@Beta
+public final class GrpcDeviceUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(GrpcDeviceUtils.class);
+
+ /**
+ * Translates gRPC enum MastershipRole to ONOS enum.
+ *
+ * @param role mastership role in gRPC enum
+ * @return equivalent in ONOS enum
+ */
+ public static MastershipRole translate(org.onosproject.grpc.Device.MastershipRole role) {
+ switch (role) {
+ case NONE:
+ return MastershipRole.NONE;
+ case MASTER:
+ return MastershipRole.MASTER;
+ case STANDBY:
+ return MastershipRole.STANDBY;
+ case UNRECOGNIZED:
+ log.warn("Unrecognized MastershipRole gRPC message: {}", role);
+ default:
+ return MastershipRole.NONE;
+ }
+ }
+
+ /**
+ * Translates ONOS enum MastershipRole to gRPC enum.
+ *
+ * @param newRole ONOS' mastership role
+ * @return equivalent in gRPC message enum
+ */
+ public static org.onosproject.grpc.Device.MastershipRole translate(MastershipRole newRole) {
+ switch (newRole) {
+ case MASTER:
+ return org.onosproject.grpc.Device.MastershipRole.MASTER;
+ case STANDBY:
+ return org.onosproject.grpc.Device.MastershipRole.STANDBY;
+ case NONE:
+ default:
+ return org.onosproject.grpc.Device.MastershipRole.NONE;
+ }
+ }
+
+
+ /**
+ * Translates gRPC DeviceDescription to {@link DeviceDescription}.
+ *
+ * @param deviceDescription gRPC message
+ * @return {@link DeviceDescription}
+ */
+ public static DeviceDescription translate(org.onosproject.grpc.Device.DeviceDescription deviceDescription) {
+ URI uri = URI.create(deviceDescription.getDeviceUri());
+ Device.Type type = translate(deviceDescription.getType());
+ String manufacturer = deviceDescription.getManufacturer();
+ String hwVersion = deviceDescription.getHwVersion();
+ String swVersion = deviceDescription.getSwVersion();
+ String serialNumber = deviceDescription.getSerialNumber();
+ ChassisId chassis = new ChassisId(deviceDescription.getChassisId());
+ return new DefaultDeviceDescription(uri, type, manufacturer,
+ hwVersion, swVersion, serialNumber,
+ chassis,
+ asAnnotations(deviceDescription.getAnnotations()));
+ }
+
+ /**
+ * Translates {@link DeviceDescription} to gRPC DeviceDescription message.
+ *
+ * @param deviceDescription {@link DeviceDescription}
+ * @return gRPC DeviceDescription message
+ */
+ public static org.onosproject.grpc.Device.DeviceDescription translate(DeviceDescription deviceDescription) {
+
+ return org.onosproject.grpc.Device.DeviceDescription.newBuilder()
+ .setDeviceUri(deviceDescription.deviceUri().toString())
+ .setType(translate(deviceDescription.type()))
+ .setManufacturer(deviceDescription.manufacturer())
+ .setHwVersion(deviceDescription.hwVersion())
+ .setSwVersion(deviceDescription.swVersion())
+ .setSerialNumber(deviceDescription.serialNumber())
+ .setChassisId(deviceDescription.chassisId().toString())
+ .putAllAnnotations(asMap(deviceDescription.annotations()))
+ .build();
+ }
+
+
+ /**
+ * Translates gRPC DeviceType to {@link Device.Type}.
+ *
+ * @param type gRPC message
+ * @return {@link Device.Type}
+ */
+ public static Device.Type translate(org.onosproject.grpc.Device.DeviceType type) {
+ switch (type) {
+ case BALANCER:
+ return Device.Type.BALANCER;
+ case CONTROLLER:
+ return Device.Type.CONTROLLER;
+ case FIBER_SWITCH:
+ return Device.Type.FIBER_SWITCH;
+ case FIREWALL:
+ return Device.Type.FIREWALL;
+ case IDS:
+ return Device.Type.IDS;
+ case IPS:
+ return Device.Type.IPS;
+ case MICROWAVE:
+ return Device.Type.MICROWAVE;
+ case OTHER:
+ return Device.Type.OTHER;
+ case OTN:
+ return Device.Type.OTN;
+ case ROADM:
+ return Device.Type.ROADM;
+ case ROADM_OTN:
+ return Device.Type.ROADM_OTN;
+ case ROUTER:
+ return Device.Type.ROUTER;
+ case SWITCH:
+ return Device.Type.SWITCH;
+ case VIRTUAL:
+ return Device.Type.VIRTUAL;
+
+ case UNRECOGNIZED:
+ default:
+ log.warn("Unexpected DeviceType: {}", type);
+ return Device.Type.OTHER;
+ }
+ }
+
+ /**
+ * Translates {@link Type} to gRPC DeviceType.
+ *
+ * @param type {@link Type}
+ * @return gRPC message
+ */
+ public static DeviceType translate(Device.Type type) {
+ switch (type) {
+ case BALANCER:
+ return DeviceType.BALANCER;
+ case CONTROLLER:
+ return DeviceType.CONTROLLER;
+ case FIBER_SWITCH:
+ return DeviceType.FIBER_SWITCH;
+ case FIREWALL:
+ return DeviceType.FIREWALL;
+ case IDS:
+ return DeviceType.IDS;
+ case IPS:
+ return DeviceType.IPS;
+ case MICROWAVE:
+ return DeviceType.MICROWAVE;
+ case OTHER:
+ return DeviceType.OTHER;
+ case OTN:
+ return DeviceType.OTN;
+ case ROADM:
+ return DeviceType.ROADM;
+ case ROADM_OTN:
+ return DeviceType.ROADM_OTN;
+ case ROUTER:
+ return DeviceType.ROUTER;
+ case SWITCH:
+ return DeviceType.SWITCH;
+ case VIRTUAL:
+ return DeviceType.VIRTUAL;
+
+ default:
+ log.warn("Unexpected Device.Type: {}", type);
+ return DeviceType.OTHER;
+ }
+ }
+
+ /**
+ * Translates gRPC PortDescription message to {@link PortDescription}.
+ *
+ * @param portDescription gRPC message
+ * @return {@link PortDescription}
+ */
+ public static PortDescription translate(org.onosproject.grpc.Port.PortDescription portDescription) {
+ PortNumber number = PortNumber.fromString(portDescription.getPortNumber());
+ boolean isEnabled = portDescription.getIsEnabled();
+ Port.Type type = translate(portDescription.getType());
+ long portSpeed = portDescription.getPortSpeed();
+ SparseAnnotations annotations = asAnnotations(portDescription.getAnnotations());
+ // TODO How to deal with more specific Port...
+ return new DefaultPortDescription(number, isEnabled, type, portSpeed, annotations);
+ }
+
+ /**
+ * Translates {@link PortDescription} to gRPC PortDescription message.
+ *
+ * @param portDescription {@link PortDescription}
+ * @return gRPC PortDescription message
+ */
+ public static org.onosproject.grpc.Port.PortDescription translate(PortDescription portDescription) {
+ // TODO How to deal with more specific Port...
+ return org.onosproject.grpc.Port.PortDescription.newBuilder()
+ .setPortNumber(portDescription.portNumber().toString())
+ .setIsEnabled(portDescription.isEnabled())
+ .setType(translate(portDescription.type()))
+ .setPortSpeed(portDescription.portSpeed())
+ .putAllAnnotations(asMap(portDescription.annotations()))
+ .build();
+ }
+
+ /**
+ * Translates gRPC PortType to {@link Port.Type}.
+ *
+ * @param type gRPC message
+ * @return {@link Port.Type}
+ */
+ public static Port.Type translate(PortType type) {
+ switch (type) {
+ case COPPER:
+ return Type.COPPER;
+ case FIBER:
+ return Type.FIBER;
+ case OCH:
+ return Type.OCH;
+ case ODUCLT:
+ return Type.ODUCLT;
+ case OMS:
+ return Type.OMS;
+ case PACKET:
+ return Type.PACKET;
+ case VIRTUAL:
+ return Type.VIRTUAL;
+
+ case UNRECOGNIZED:
+ default:
+ log.warn("Unexpected PortType: {}", type);
+ return Type.COPPER;
+ }
+ }
+
+ /**
+ * Translates {@link Port.Type} to gRPC PortType.
+ *
+ * @param type {@link Port.Type}
+ * @return gRPC message
+ */
+ public static PortType translate(Port.Type type) {
+ switch (type) {
+ case COPPER:
+ return PortType.COPPER;
+ case FIBER:
+ return PortType.FIBER;
+ case OCH:
+ return PortType.OCH;
+ case ODUCLT:
+ return PortType.ODUCLT;
+ case OMS:
+ return PortType.OMS;
+ case PACKET:
+ return PortType.PACKET;
+ case VIRTUAL:
+ return PortType.VIRTUAL;
+
+ default:
+ log.warn("Unexpected Port.Type: {}", type);
+ return PortType.COPPER;
+ }
+ }
+
+ /**
+ * Translates gRPC PortStatistics message to {@link PortStatistics}.
+ *
+ * @param portStatistics gRPC PortStatistics message
+ * @return {@link PortStatistics}
+ */
+ public static PortStatistics translate(org.onosproject.grpc.Port.PortStatistics portStatistics) {
+ // TODO implement adding missing fields
+ return DefaultPortStatistics.builder()
+ .setPort(portStatistics.getPort())
+ .setPacketsReceived(portStatistics.getPacketsReceived())
+ .setPacketsSent(portStatistics.getPacketsSent())
+ .build();
+ }
+
+ /**
+ * Translates {@link PortStatistics} to gRPC PortStatistics message.
+ *
+ * @param portStatistics {@link PortStatistics}
+ * @return gRPC PortStatistics message
+ */
+ public static org.onosproject.grpc.Port.PortStatistics translate(PortStatistics portStatistics) {
+ // TODO implement adding missing fields
+ return org.onosproject.grpc.Port.PortStatistics.newBuilder()
+ .setPort(portStatistics.port())
+ .setPacketsReceived(portStatistics.packetsReceived())
+ .setPacketsSent(portStatistics.packetsSent())
+ .build();
+ }
+
+ // may be this can be moved to Annotation itself or AnnotationsUtils
+ /**
+ * Converts Annotations to Map of Strings.
+ *
+ * @param annotations {@link Annotations}
+ * @return Map of annotation key and values
+ */
+ public static Map<String, String> asMap(Annotations annotations) {
+ if (annotations instanceof DefaultAnnotations) {
+ return ((DefaultAnnotations) annotations).asMap();
+ }
+ Map<String, String> map = new HashMap<>();
+ annotations.keys()
+ .forEach(k -> map.put(k, annotations.value(k)));
+
+ return map;
+ }
+
+ // may be this can be moved to Annotation itself or AnnotationsUtils
+ /**
+ * Converts Map of Strings to {@link SparseAnnotations}.
+ *
+ * @param annotations Map of annotation key and values
+ * @return {@link SparseAnnotations}
+ */
+ public static SparseAnnotations asAnnotations(Map<String, String> annotations) {
+ DefaultAnnotations.Builder builder = DefaultAnnotations.builder();
+ annotations.entrySet().forEach(e -> {
+ if (e.getValue() != null) {
+ builder.set(e.getKey(), e.getValue());
+ } else {
+ builder.remove(e.getKey());
+ }
+ });
+ return builder.build();
+ }
+
+ // Utility class not intended for instantiation.
+ private GrpcDeviceUtils() {}
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java
new file mode 100644
index 00000000..b419a346
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.onosproject.incubator.rpc.RemoteServiceContext;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+
+import io.grpc.ManagedChannel;
+
+// gRPC Client side
+// Probably there should be plug-in mechanism in the future.
+/**
+ * RemoteServiceContext based on gRPC.
+ *
+ * <p>
+ * Currently it supports {@link DeviceProviderRegistry}.
+ */
+public class GrpcRemoteServiceContext implements RemoteServiceContext {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final Map<Class<? extends Object>, Object> services = new ConcurrentHashMap<>();
+
+ private final ManagedChannel channel;
+
+ public GrpcRemoteServiceContext(ManagedChannel channel) {
+ this.channel = checkNotNull(channel);
+ services.put(DeviceProviderRegistry.class, new DeviceProviderRegistryClientProxy(channel));
+ }
+
+
+ @Override
+ public <T> T get(Class<T> serviceClass) {
+ @SuppressWarnings("unchecked")
+ T service = (T) services.get(serviceClass);
+ if (service != null) {
+ return service;
+ }
+ log.error("{} not supported", serviceClass);
+ throw new NoSuchElementException(serviceClass.getTypeName() + " not supported");
+ }
+
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("services", services.keySet())
+ .add("channel", channel.authority())
+ .toString();
+ }
+
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java
new file mode 100644
index 00000000..74962187
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.incubator.rpc.RemoteServiceContext;
+import org.onosproject.incubator.rpc.RemoteServiceContextProvider;
+import org.onosproject.incubator.rpc.RemoteServiceContextProviderService;
+import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry;
+import org.onosproject.net.provider.ProviderId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
+
+// gRPC Client side
+/**
+ * RemoteServiceContextProvider based on gRPC.
+ */
+@Component(immediate = true)
+public class GrpcRemoteServiceProvider implements RemoteServiceContextProvider {
+
+ public static final String GRPC_SCHEME = "grpc";
+
+ public static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
+
+ private static final ProviderId PID = new ProviderId(GRPC_SCHEME, RPC_PROVIDER_NAME);
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected RemoteServiceProviderRegistry rpcRegistry;
+
+ private Map<URI, ManagedChannel> channels = new ConcurrentHashMap<>();
+
+ private RemoteServiceContextProviderService providerService;
+
+
+ @Activate
+ protected void activate() {
+ providerService = rpcRegistry.register(this);
+
+ // FIXME remove me. test code to see if gRPC loads in karaf
+ //getChannel(URI.create("grpc://localhost:8080"));
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ rpcRegistry.unregister(this);
+
+ // shutdown all channels
+ channels.values().stream()
+ .forEach(ManagedChannel::shutdown);
+ // Should we wait for shutdown? How?
+ channels.clear();
+ log.info("Stopped");
+ }
+
+ @Override
+ public ProviderId id() {
+ return PID;
+ }
+
+ @Override
+ public RemoteServiceContext get(URI uri) {
+ // Create gRPC client
+ return new GrpcRemoteServiceContext(getChannel(uri));
+ }
+
+ private ManagedChannel getChannel(URI uri) {
+ checkArgument(Objects.equals(GRPC_SCHEME, uri.getScheme()),
+ "Invalid URI scheme: %s", uri.getScheme());
+
+ return channels.compute(uri, (u, ch) -> {
+ if (ch != null && !ch.isShutdown()) {
+ return ch;
+ } else {
+ return createChannel(u);
+ }
+ });
+ }
+
+ private ManagedChannel createChannel(URI uri) {
+ log.debug("Creating channel for {}", uri);
+ return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort())
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .build();
+ }
+
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
new file mode 100644
index 00000000..4f43fa65
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
@@ -0,0 +1,385 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.stream.Collectors.toList;
+import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
+import static org.onosproject.net.DeviceId.deviceId;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.grpc.Device.DeviceConnected;
+import org.onosproject.grpc.Device.DeviceDisconnected;
+import org.onosproject.grpc.Device.DeviceProviderMsg;
+import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
+import org.onosproject.grpc.Device.IsReachableResponse;
+import org.onosproject.grpc.Device.PortStatusChanged;
+import org.onosproject.grpc.Device.ReceivedRoleReply;
+import org.onosproject.grpc.Device.RegisterProvider;
+import org.onosproject.grpc.Device.UpdatePortStatistics;
+import org.onosproject.grpc.Device.UpdatePorts;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.provider.ProviderId;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import io.grpc.stub.StreamObserver;
+
+// gRPC Server on Metro-side
+// Translates request received on RPC channel, and calls corresponding Service on
+// Metro-ONOS cluster.
+/**
+ * Server side implementation of gRPC based RemoteService.
+ */
+@Component(immediate = true)
+public class GrpcRemoteServiceServer {
+
+ private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
+
+ // TODO pick a number
+ public static final int DEFAULT_LISTEN_PORT = 11984;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceProviderRegistry deviceProviderRegistry;
+
+
+ @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
+ label = "Port to listen on")
+ protected int listenPort = DEFAULT_LISTEN_PORT;
+
+ private Server server;
+ private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
+
+ @Activate
+ protected void activate(ComponentContext context) throws IOException {
+ modified(context);
+
+ log.debug("Server starting on {}", listenPort);
+ try {
+ server = NettyServerBuilder.forPort(listenPort)
+ .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
+ .build().start();
+ } catch (IOException e) {
+ log.error("Failed to start gRPC server", e);
+ throw e;
+ }
+
+ log.info("Started on {}", listenPort);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+
+ registeredProviders.stream()
+ .forEach(deviceProviderRegistry::unregister);
+
+ server.shutdown();
+ // Should we wait for shutdown?
+ log.info("Stopped");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ // TODO support dynamic reconfiguration and restarting server?
+ }
+
+ // RPC Server-side code
+ // RPC session Factory
+ /**
+ * Relays DeviceProviderRegistry calls from RPC client.
+ */
+ class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
+
+ @Override
+ public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ log.trace("DeviceProviderRegistryServerProxy#register called!");
+
+ DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
+
+ return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
+ }
+ }
+
+ // Lower -> Upper Controller message
+ // RPC Server-side code
+ // RPC session handler
+ private final class DeviceProviderServiceServerProxy
+ implements StreamObserver<DeviceProviderServiceMsg> {
+
+ // intentionally shadowing
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final DeviceProviderServerProxy pairedProvider;
+ private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
+
+ private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
+
+ // wrapped providerService
+ private DeviceProviderService deviceProviderService;
+
+
+ DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
+ StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ this.pairedProvider = provider;
+ this.toDeviceProvider = toDeviceProvider;
+ outstandingIsReachable = CacheBuilder.newBuilder()
+ .expireAfterWrite(1, TimeUnit.MINUTES)
+ .build();
+
+ // pair RPC session in other direction
+ provider.pair(this);
+ }
+
+ @Override
+ public void onNext(DeviceProviderServiceMsg msg) {
+ try {
+ log.trace("DeviceProviderServiceServerProxy received: {}", msg);
+ onMethod(msg);
+ } catch (Exception e) {
+ log.error("Exception thrown handling {}", msg, e);
+ onError(e);
+ throw e;
+ }
+ }
+
+ /**
+ * Translates received RPC message to {@link DeviceProviderService} method calls.
+ * @param msg DeviceProviderService message
+ */
+ private void onMethod(DeviceProviderServiceMsg msg) {
+ switch (msg.getMethodCase()) {
+ case REGISTER_PROVIDER:
+ RegisterProvider registerProvider = msg.getRegisterProvider();
+ // TODO Do we care about provider name?
+ pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
+ registeredProviders.add(pairedProvider);
+ deviceProviderService = deviceProviderRegistry.register(pairedProvider);
+ break;
+
+ case DEVICE_CONNECTED:
+ DeviceConnected deviceConnected = msg.getDeviceConnected();
+ deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
+ translate(deviceConnected.getDeviceDescription()));
+ break;
+ case DEVICE_DISCONNECTED:
+ DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
+ deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
+ break;
+ case UPDATE_PORTS:
+ UpdatePorts updatePorts = msg.getUpdatePorts();
+ deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
+ updatePorts.getPortDescriptionsList()
+ .stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList()));
+ break;
+ case PORT_STATUS_CHANGED:
+ PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
+ deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
+ translate(portStatusChanged.getPortDescription()));
+ break;
+ case RECEIVED_ROLE_REPLY:
+ ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
+ deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
+ translate(receivedRoleReply.getRequested()),
+ translate(receivedRoleReply.getResponse()));
+ break;
+ case UPDATE_PORT_STATISTICS:
+ UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
+ deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
+ updatePortStatistics.getPortStatisticsList()
+ .stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList()));
+ break;
+
+ // return value of DeviceProvider#isReachable
+ case IS_REACHABLE_RESPONSE:
+ IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
+ int xid = isReachableResponse.getXid();
+ boolean isReachable = isReachableResponse.getIsReachable();
+ CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
+ if (result != null) {
+ result.complete(isReachable);
+ }
+ break;
+
+ case METHOD_NOT_SET:
+ default:
+ log.warn("Unexpected message received {}", msg);
+ break;
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("DeviceProviderServiceServerProxy completed");
+ deviceProviderRegistry.unregister(pairedProvider);
+ registeredProviders.remove(pairedProvider);
+ toDeviceProvider.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ log.error("DeviceProviderServiceServerProxy#onError", e);
+ deviceProviderRegistry.unregister(pairedProvider);
+ registeredProviders.remove(pairedProvider);
+ // TODO What is the proper clean up for bi-di stream on error?
+ // sample suggests no-op
+ toDeviceProvider.onError(e);
+ }
+
+
+ /**
+ * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
+ * @param xid IsReachable call ID.
+ * @param reply Future to
+ */
+ void register(int xid, CompletableFuture<Boolean> reply) {
+ outstandingIsReachable.put(xid, reply);
+ }
+
+ }
+
+ // Upper -> Lower Controller message
+ /**
+ * Relay DeviceProvider calls to RPC client.
+ */
+ private final class DeviceProviderServerProxy
+ implements DeviceProvider {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ // xid for isReachable calls
+ private final AtomicInteger xidPool = new AtomicInteger();
+ private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
+
+ private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
+ private ProviderId providerId;
+
+ DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ this.toDeviceProvider = toDeviceProvider;
+ }
+
+ void setProviderId(ProviderId pid) {
+ this.providerId = pid;
+ }
+
+ /**
+ * Registers RPC stream in other direction.
+ * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
+ */
+ void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
+ this.deviceProviderServiceProxy = deviceProviderServiceProxy;
+ }
+
+ @Override
+ public void triggerProbe(DeviceId deviceId) {
+ log.trace("triggerProbe({})", deviceId);
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
+ .setDeviceId(deviceId.toString())
+ .build());
+ DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
+ toDeviceProvider.onNext(triggerProbeMsg);
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+ log.trace("roleChanged({}, {})", deviceId, newRole);
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
+ .setDeviceId(deviceId.toString())
+ .setNewRole(translate(newRole))
+ .build());
+ toDeviceProvider.onNext(msgBuilder.build());
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public boolean isReachable(DeviceId deviceId) {
+ log.trace("isReachable({})", deviceId);
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ final int xid = xidPool.incrementAndGet();
+
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
+ .setXid(xid)
+ .setDeviceId(deviceId.toString())
+ .build());
+
+ // Associate xid and register above future some where
+ // in DeviceProviderService channel to receive reply
+ if (deviceProviderServiceProxy != null) {
+ deviceProviderServiceProxy.register(xid, result);
+ }
+
+ // send message down RPC
+ toDeviceProvider.onNext(msgBuilder.build());
+
+ // wait for reply
+ try {
+ return result.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.debug("isReachable({}) was Interrupted", deviceId, e);
+ Thread.currentThread().interrupt();
+ } catch (TimeoutException e) {
+ log.warn("isReachable({}) Timed out", deviceId, e);
+ } catch (ExecutionException e) {
+ log.error("isReachable({}) Execution failed", deviceId, e);
+ // close session?
+ }
+ return false;
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public ProviderId id() {
+ return checkNotNull(providerId, "not initialized yet");
+ }
+
+ }
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java
new file mode 100644
index 00000000..d667ea77
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC based RemoteServiceProvider implementation.
+ */
+package org.onosproject.incubator.rpc.grpc;