From e63291850fd0795c5700e25e67e5dee89ba54c5f Mon Sep 17 00:00:00 2001 From: Ashlee Young Date: Tue, 1 Dec 2015 05:49:27 -0800 Subject: onos commit hash c2999f30c69e50df905a9d175ef80b3f23a98514 Change-Id: I2bb8562c4942b6d6a6d60b663db2e17540477b81 Signed-off-by: Ashlee Young --- framework/src/onos/incubator/rpc-grpc/features.xml | 37 ++ framework/src/onos/incubator/rpc-grpc/pom.xml | 269 ++++++++++++++ .../grpc/DeviceProviderRegistryClientProxy.java | 77 ++++ .../rpc/grpc/DeviceProviderServiceClientProxy.java | 295 +++++++++++++++ .../incubator/rpc/grpc/GrpcDeviceUtils.java | 381 ++++++++++++++++++++ .../rpc/grpc/GrpcRemoteServiceContext.java | 75 ++++ .../rpc/grpc/GrpcRemoteServiceProvider.java | 119 ++++++ .../rpc/grpc/GrpcRemoteServiceServer.java | 385 ++++++++++++++++++++ .../incubator/rpc/grpc/package-info.java | 20 ++ .../incubator/rpc-grpc/src/main/proto/Device.proto | 131 +++++++ .../incubator/rpc-grpc/src/main/proto/Port.proto | 40 +++ .../incubator/rpc/grpc/GrpcRemoteServiceTest.java | 398 +++++++++++++++++++++ 12 files changed, 2227 insertions(+) create mode 100644 framework/src/onos/incubator/rpc-grpc/features.xml create mode 100644 framework/src/onos/incubator/rpc-grpc/pom.xml create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/proto/Device.proto create mode 100644 framework/src/onos/incubator/rpc-grpc/src/main/proto/Port.proto create mode 100644 framework/src/onos/incubator/rpc-grpc/src/test/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceTest.java (limited to 'framework/src/onos/incubator/rpc-grpc') diff --git a/framework/src/onos/incubator/rpc-grpc/features.xml b/framework/src/onos/incubator/rpc-grpc/features.xml new file mode 100644 index 00000000..df768fbb --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/features.xml @@ -0,0 +1,37 @@ + + + + + onos-api + mvn:com.google.protobuf/protobuf-java/3.0.0-beta-1 + mvn:io.netty/netty-common/4.1.0.Beta6 + mvn:io.netty/netty-buffer/4.1.0.Beta6 + mvn:io.netty/netty-transport/4.1.0.Beta6 + mvn:io.netty/netty-handler/4.1.0.Beta6 + mvn:io.netty/netty-codec/4.1.0.Beta6 + mvn:io.netty/netty-codec-http/4.1.0.Beta6 + mvn:io.netty/netty-codec-http2/4.1.0.Beta6 + mvn:io.netty/netty-resolver/4.1.0.Beta6 + mvn:com.twitter/hpack/0.11.0 + + wrap:mvn:com.google.auth/google-auth-library-credentials/0.3.0$Bundle-SymbolicName=com.google.auth.google-auth-library-credentials&Bundle-Version=0.3.0 + wrap:mvn:com.google.auth/google-auth-library-oauth2-http/0.3.0$Bundle-SymbolicName=com.google.auth.google-auth-library-oauth2-http&Bundle-Version=0.3.0 + wrap:mvn:io.grpc/grpc-all/0.9.0$Bundle-SymbolicName=io.grpc.grpc-all&Bundle-Version=0.9.0&Import-Package=io.netty.*;version=4.1.0.Beta6,javax.net.ssl,com.google.protobuf.nano;resolution:=optional,okio;resolution:=optional,* + mvn:${project.groupId}/${project.artifactId}/${project.version} + + diff --git a/framework/src/onos/incubator/rpc-grpc/pom.xml b/framework/src/onos/incubator/rpc-grpc/pom.xml new file mode 100644 index 00000000..f528ca53 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/pom.xml @@ -0,0 +1,269 @@ + + + + 4.0.0 + + onos-incubator + org.onosproject + 1.4.0-SNAPSHOT + + + onos-incubator-rpc-grpc + bundle + + ONOS inter-cluster RPC based on gRPC + http://onosproject.org + + + org.onosproject.incubator.rpc.grpc + org.onosproject.incubator.rpc + + 0.9.0 + 4.1.0.Beta6 + + + + + protoc-plugin + https://dl.bintray.com/sergei-ivanov/maven/ + + + + + + + org.onosproject + onos-api + + + + org.onosproject + onos-incubator-api + + + + org.onosproject + onlab-osgi + + + + io.grpc + grpc-core + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-netty + ${grpc.version} + + + io.grpc + grpc-auth + ${grpc.version} + + + + junit + junit + test + + + + org.onosproject + onos-api + test + tests + + + + org.apache.felix + org.apache.felix.scr.annotations + provided + + + + + + + kr.motd.maven + os-maven-plugin + 1.4.0.Final + + + + + + org.apache.felix + maven-bundle-plugin + true + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + org.apache.felix + maven-scr-plugin + + + generate-scr-srcdescriptor + + scr + + + + + + true + + bundle + war + + + + + org.onosproject + onos-maven-plugin + + + cfg + generate-resources + + cfg + + + + swagger + generate-sources + + swagger + + + + app + package + + app + + + + + + + com.google.protobuf.tools + maven-protoc-plugin + 0.4.2 + + + com.google.protobuf:protoc:3.0.0-beta-1:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.9.1 + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/protobuf/java + ${project.build.directory}/generated-sources/protobuf/grpc-java + + + + + + + + + + + + + + io.netty + netty-codec + ${grpc.netty.version} + + + io.netty + netty-transport + ${grpc.netty.version} + + + io.netty + netty-handler + ${grpc.netty.version} + + + io.netty + netty-buffer + ${grpc.netty.version} + + + io.netty + netty-common + ${grpc.netty.version} + + + com.twitter + hpack + + 0.11.0 + + + + + diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java new file mode 100644 index 00000000..cad0fbb6 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java @@ -0,0 +1,77 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import java.util.Map; + +import org.onosproject.net.device.DeviceProvider; +import org.onosproject.net.device.DeviceProviderRegistry; +import org.onosproject.net.device.DeviceProviderService; +import org.onosproject.net.provider.AbstractProviderRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +import io.grpc.Channel; +import io.grpc.ManagedChannel; + +// gRPC Client side +/** + * Proxy object to handle DeviceProviderRegistry calls. + * + * RPC wise, this will start/stop bidirectional streaming service sessions. + */ +final class DeviceProviderRegistryClientProxy + extends AbstractProviderRegistry + implements DeviceProviderRegistry { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final Channel channel; + + private final Map pServices; + + DeviceProviderRegistryClientProxy(ManagedChannel channel) { + this.channel = channel; + pServices = Maps.newIdentityHashMap(); + } + + @Override + protected synchronized DeviceProviderService createProviderService(DeviceProvider provider) { + + // Create session + DeviceProviderServiceClientProxy pService = new DeviceProviderServiceClientProxy(provider, channel); + log.debug("Created DeviceProviderServiceClientProxy", pService); + + DeviceProviderServiceClientProxy old = pServices.put(provider, pService); + if (old != null) { + // sanity check, can go away + log.warn("Duplicate registration detected for {}", provider.id()); + } + return pService; + } + + @Override + public synchronized void unregister(DeviceProvider provider) { + DeviceProviderServiceClientProxy pService = pServices.remove(provider); + log.debug("Unregistering DeviceProviderServiceClientProxy", pService); + super.unregister(provider); + if (pService != null) { + pService.shutdown(); + } + } +} diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java new file mode 100644 index 00000000..498011f2 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java @@ -0,0 +1,295 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.stream.Collectors.toList; +import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate; +import static org.onosproject.net.DeviceId.deviceId; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.onosproject.grpc.Device.DeviceProviderMsg; +import org.onosproject.grpc.Device.DeviceProviderServiceMsg; +import org.onosproject.grpc.Device.IsReachableRequest; +import org.onosproject.grpc.Device.RoleChanged; +import org.onosproject.grpc.Device.TriggerProbe; +import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc; +import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub; +import org.onosproject.net.DeviceId; +import org.onosproject.net.MastershipRole; +import org.onosproject.net.device.DeviceDescription; +import org.onosproject.net.device.DeviceProvider; +import org.onosproject.net.device.DeviceProviderService; +import org.onosproject.net.device.PortDescription; +import org.onosproject.net.device.PortStatistics; +import org.onosproject.net.provider.AbstractProviderService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.MoreObjects; + +import io.grpc.Channel; +import io.grpc.stub.StreamObserver; + +// gRPC Client side +// gRPC wise, this object represents bidirectional streaming service session +// and deals with outgoing message stream +/** + * DeviceProviderService instance associated with given DeviceProvider. + */ +final class DeviceProviderServiceClientProxy + extends AbstractProviderService + implements DeviceProviderService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final StreamObserver devProvService; + private final AtomicBoolean hasShutdown = new AtomicBoolean(false); + + private final Channel channel; + + DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) { + super(provider); + this.channel = channel; + + DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel); + log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority()); + devProvService = stub.register(new DeviceProviderClientProxy(provider)); + + // send initialize message + DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder(); + builder.setRegisterProvider(builder.getRegisterProviderBuilder() + .setProviderScheme(provider.id().scheme()) + .build()); + devProvService.onNext(builder.build()); + } + + @Override + public void deviceConnected(DeviceId deviceId, + DeviceDescription deviceDescription) { + checkValidity(); + + DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder(); + builder.setDeviceConnected(builder.getDeviceConnectedBuilder() + .setDeviceId(deviceId.toString()) + .setDeviceDescription(translate(deviceDescription)) + .build()); + + devProvService.onNext(builder.build()); + } + + @Override + public void deviceDisconnected(DeviceId deviceId) { + checkValidity(); + + DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder(); + builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder() + .setDeviceId(deviceId.toString()) + .build()); + + devProvService.onNext(builder.build()); + } + + @Override + public void updatePorts(DeviceId deviceId, + List portDescriptions) { + checkValidity(); + + DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder(); + List portDescs = + portDescriptions.stream() + .map(GrpcDeviceUtils::translate) + .collect(toList()); + + builder.setUpdatePorts(builder.getUpdatePortsBuilder() + .setDeviceId(deviceId.toString()) + .addAllPortDescriptions(portDescs) + .build()); + + devProvService.onNext(builder.build()); + } + + @Override + public void portStatusChanged(DeviceId deviceId, + PortDescription portDescription) { + checkValidity(); + + DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder(); + builder.setPortStatusChanged(builder.getPortStatusChangedBuilder() + .setDeviceId(deviceId.toString()) + .setPortDescription(translate(portDescription)) + .build()); + + devProvService.onNext(builder.build()); + } + + @Override + public void receivedRoleReply(DeviceId deviceId, MastershipRole requested, + MastershipRole response) { + checkValidity(); + + DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder(); + builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder() + .setDeviceId(deviceId.toString()) + .setRequested(translate(requested)) + .setResponse(translate(response)) + .build()); + + devProvService.onNext(builder.build()); + } + + @Override + public void updatePortStatistics(DeviceId deviceId, + Collection portStatistics) { + checkValidity(); + + DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder(); + List portStats = + portStatistics.stream() + .map(GrpcDeviceUtils::translate) + .collect(toList()); + builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder() + .setDeviceId(deviceId.toString()) + .addAllPortStatistics(portStats) + .build()); + + devProvService.onNext(builder.build()); + } + + /** + * Shutdown this session. + */ + public void shutdown() { + if (hasShutdown.compareAndSet(false, true)) { + log.info("Shutting down session over {}", channel.authority()); + // initiate clean shutdown from client + devProvService.onCompleted(); + invalidate(); + } + } + + /** + * Abnormally terminate this session. + * @param t error details + */ + public void shutdown(Throwable t) { + if (hasShutdown.compareAndSet(false, true)) { + log.error("Shutting down session over {}", channel.authority()); + // initiate abnormal termination from client + devProvService.onError(t); + invalidate(); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("channel", channel.authority()) + .add("hasShutdown", hasShutdown.get()) + .toString(); + } + + // gRPC wise, this object handles incoming message stream + /** + * Translates DeviceProvider instructions received from RPC to Java calls. + */ + private final class DeviceProviderClientProxy + implements StreamObserver { + + private final DeviceProvider provider; + + DeviceProviderClientProxy(DeviceProvider provider) { + this.provider = checkNotNull(provider); + } + + @Override + public void onNext(DeviceProviderMsg msg) { + try { + log.trace("DeviceProviderClientProxy received: {}", msg); + onMethod(msg); + } catch (Exception e) { + log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e); + // initiate shutdown from client + shutdown(e); + } + } + + /** + * Translates received RPC message to {@link DeviceProvider} method calls. + * @param msg DeviceProvider message + */ + private void onMethod(DeviceProviderMsg msg) { + switch (msg.getMethodCase()) { + case TRIGGER_PROBE: + TriggerProbe triggerProbe = msg.getTriggerProbe(); + provider.triggerProbe(deviceId(triggerProbe.getDeviceId())); + break; + case ROLE_CHANGED: + RoleChanged roleChanged = msg.getRoleChanged(); + provider.roleChanged(deviceId(roleChanged.getDeviceId()), + translate(roleChanged.getNewRole())); + break; + case IS_REACHABLE_REQUEST: + IsReachableRequest isReachableRequest = msg.getIsReachableRequest(); + // check if reachable + boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId())); + + int xid = isReachableRequest.getXid(); + // send response back DeviceProviderService channel + DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder(); + builder.setIsReachableResponse(builder.getIsReachableResponseBuilder() + .setXid(xid) + .setIsReachable(reachable) + .build()); + devProvService.onNext(builder.build()); + break; + + case METHOD_NOT_SET: + default: + log.warn("Unexpected method, ignoring", msg); + break; + } + } + + @Override + public void onCompleted() { + log.info("DeviceProviderClientProxy completed"); + // session terminated from remote + // TODO unregister...? how? + + //devProvService.onCompleted(); + } + + @Override + public void onError(Throwable t) { + log.error("DeviceProviderClientProxy#onError", t); + // session terminated from remote + // TODO unregister...? how? + //devProvService.onError(t); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("channel", channel.authority()) + .toString(); + } + } +} + diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java new file mode 100644 index 00000000..7045b0c2 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java @@ -0,0 +1,381 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.onlab.packet.ChassisId; +import org.onosproject.grpc.Device.DeviceType; +import org.onosproject.grpc.Port.PortType; +import org.onosproject.net.Annotations; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.Device; +import org.onosproject.net.MastershipRole; +import org.onosproject.net.Port; +import org.onosproject.net.Port.Type; +import org.onosproject.net.PortNumber; +import org.onosproject.net.SparseAnnotations; +import org.onosproject.net.device.DefaultDeviceDescription; +import org.onosproject.net.device.DefaultPortDescription; +import org.onosproject.net.device.DefaultPortStatistics; +import org.onosproject.net.device.DeviceDescription; +import org.onosproject.net.device.PortDescription; +import org.onosproject.net.device.PortStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.api.client.repackaged.com.google.common.annotations.Beta; + +/** + * gRPC message conversion related utilities. + */ +@Beta +public final class GrpcDeviceUtils { + + private static final Logger log = LoggerFactory.getLogger(GrpcDeviceUtils.class); + + /** + * Translates gRPC enum MastershipRole to ONOS enum. + * + * @param role mastership role in gRPC enum + * @return equivalent in ONOS enum + */ + public static MastershipRole translate(org.onosproject.grpc.Device.MastershipRole role) { + switch (role) { + case NONE: + return MastershipRole.NONE; + case MASTER: + return MastershipRole.MASTER; + case STANDBY: + return MastershipRole.STANDBY; + case UNRECOGNIZED: + log.warn("Unrecognized MastershipRole gRPC message: {}", role); + default: + return MastershipRole.NONE; + } + } + + /** + * Translates ONOS enum MastershipRole to gRPC enum. + * + * @param newRole ONOS' mastership role + * @return equivalent in gRPC message enum + */ + public static org.onosproject.grpc.Device.MastershipRole translate(MastershipRole newRole) { + switch (newRole) { + case MASTER: + return org.onosproject.grpc.Device.MastershipRole.MASTER; + case STANDBY: + return org.onosproject.grpc.Device.MastershipRole.STANDBY; + case NONE: + default: + return org.onosproject.grpc.Device.MastershipRole.NONE; + } + } + + + /** + * Translates gRPC DeviceDescription to {@link DeviceDescription}. + * + * @param deviceDescription gRPC message + * @return {@link DeviceDescription} + */ + public static DeviceDescription translate(org.onosproject.grpc.Device.DeviceDescription deviceDescription) { + URI uri = URI.create(deviceDescription.getDeviceUri()); + Device.Type type = translate(deviceDescription.getType()); + String manufacturer = deviceDescription.getManufacturer(); + String hwVersion = deviceDescription.getHwVersion(); + String swVersion = deviceDescription.getSwVersion(); + String serialNumber = deviceDescription.getSerialNumber(); + ChassisId chassis = new ChassisId(deviceDescription.getChassisId()); + return new DefaultDeviceDescription(uri, type, manufacturer, + hwVersion, swVersion, serialNumber, + chassis, + asAnnotations(deviceDescription.getAnnotations())); + } + + /** + * Translates {@link DeviceDescription} to gRPC DeviceDescription message. + * + * @param deviceDescription {@link DeviceDescription} + * @return gRPC DeviceDescription message + */ + public static org.onosproject.grpc.Device.DeviceDescription translate(DeviceDescription deviceDescription) { + + return org.onosproject.grpc.Device.DeviceDescription.newBuilder() + .setDeviceUri(deviceDescription.deviceUri().toString()) + .setType(translate(deviceDescription.type())) + .setManufacturer(deviceDescription.manufacturer()) + .setHwVersion(deviceDescription.hwVersion()) + .setSwVersion(deviceDescription.swVersion()) + .setSerialNumber(deviceDescription.serialNumber()) + .setChassisId(deviceDescription.chassisId().toString()) + .putAllAnnotations(asMap(deviceDescription.annotations())) + .build(); + } + + + /** + * Translates gRPC DeviceType to {@link Device.Type}. + * + * @param type gRPC message + * @return {@link Device.Type} + */ + public static Device.Type translate(org.onosproject.grpc.Device.DeviceType type) { + switch (type) { + case BALANCER: + return Device.Type.BALANCER; + case CONTROLLER: + return Device.Type.CONTROLLER; + case FIBER_SWITCH: + return Device.Type.FIBER_SWITCH; + case FIREWALL: + return Device.Type.FIREWALL; + case IDS: + return Device.Type.IDS; + case IPS: + return Device.Type.IPS; + case MICROWAVE: + return Device.Type.MICROWAVE; + case OTHER: + return Device.Type.OTHER; + case OTN: + return Device.Type.OTN; + case ROADM: + return Device.Type.ROADM; + case ROADM_OTN: + return Device.Type.ROADM_OTN; + case ROUTER: + return Device.Type.ROUTER; + case SWITCH: + return Device.Type.SWITCH; + case VIRTUAL: + return Device.Type.VIRTUAL; + + case UNRECOGNIZED: + default: + log.warn("Unexpected DeviceType: {}", type); + return Device.Type.OTHER; + } + } + + /** + * Translates {@link Type} to gRPC DeviceType. + * + * @param type {@link Type} + * @return gRPC message + */ + public static DeviceType translate(Device.Type type) { + switch (type) { + case BALANCER: + return DeviceType.BALANCER; + case CONTROLLER: + return DeviceType.CONTROLLER; + case FIBER_SWITCH: + return DeviceType.FIBER_SWITCH; + case FIREWALL: + return DeviceType.FIREWALL; + case IDS: + return DeviceType.IDS; + case IPS: + return DeviceType.IPS; + case MICROWAVE: + return DeviceType.MICROWAVE; + case OTHER: + return DeviceType.OTHER; + case OTN: + return DeviceType.OTN; + case ROADM: + return DeviceType.ROADM; + case ROADM_OTN: + return DeviceType.ROADM_OTN; + case ROUTER: + return DeviceType.ROUTER; + case SWITCH: + return DeviceType.SWITCH; + case VIRTUAL: + return DeviceType.VIRTUAL; + + default: + log.warn("Unexpected Device.Type: {}", type); + return DeviceType.OTHER; + } + } + + /** + * Translates gRPC PortDescription message to {@link PortDescription}. + * + * @param portDescription gRPC message + * @return {@link PortDescription} + */ + public static PortDescription translate(org.onosproject.grpc.Port.PortDescription portDescription) { + PortNumber number = PortNumber.fromString(portDescription.getPortNumber()); + boolean isEnabled = portDescription.getIsEnabled(); + Port.Type type = translate(portDescription.getType()); + long portSpeed = portDescription.getPortSpeed(); + SparseAnnotations annotations = asAnnotations(portDescription.getAnnotations()); + // TODO How to deal with more specific Port... + return new DefaultPortDescription(number, isEnabled, type, portSpeed, annotations); + } + + /** + * Translates {@link PortDescription} to gRPC PortDescription message. + * + * @param portDescription {@link PortDescription} + * @return gRPC PortDescription message + */ + public static org.onosproject.grpc.Port.PortDescription translate(PortDescription portDescription) { + // TODO How to deal with more specific Port... + return org.onosproject.grpc.Port.PortDescription.newBuilder() + .setPortNumber(portDescription.portNumber().toString()) + .setIsEnabled(portDescription.isEnabled()) + .setType(translate(portDescription.type())) + .setPortSpeed(portDescription.portSpeed()) + .putAllAnnotations(asMap(portDescription.annotations())) + .build(); + } + + /** + * Translates gRPC PortType to {@link Port.Type}. + * + * @param type gRPC message + * @return {@link Port.Type} + */ + public static Port.Type translate(PortType type) { + switch (type) { + case COPPER: + return Type.COPPER; + case FIBER: + return Type.FIBER; + case OCH: + return Type.OCH; + case ODUCLT: + return Type.ODUCLT; + case OMS: + return Type.OMS; + case PACKET: + return Type.PACKET; + case VIRTUAL: + return Type.VIRTUAL; + + case UNRECOGNIZED: + default: + log.warn("Unexpected PortType: {}", type); + return Type.COPPER; + } + } + + /** + * Translates {@link Port.Type} to gRPC PortType. + * + * @param type {@link Port.Type} + * @return gRPC message + */ + public static PortType translate(Port.Type type) { + switch (type) { + case COPPER: + return PortType.COPPER; + case FIBER: + return PortType.FIBER; + case OCH: + return PortType.OCH; + case ODUCLT: + return PortType.ODUCLT; + case OMS: + return PortType.OMS; + case PACKET: + return PortType.PACKET; + case VIRTUAL: + return PortType.VIRTUAL; + + default: + log.warn("Unexpected Port.Type: {}", type); + return PortType.COPPER; + } + } + + /** + * Translates gRPC PortStatistics message to {@link PortStatistics}. + * + * @param portStatistics gRPC PortStatistics message + * @return {@link PortStatistics} + */ + public static PortStatistics translate(org.onosproject.grpc.Port.PortStatistics portStatistics) { + // TODO implement adding missing fields + return DefaultPortStatistics.builder() + .setPort(portStatistics.getPort()) + .setPacketsReceived(portStatistics.getPacketsReceived()) + .setPacketsSent(portStatistics.getPacketsSent()) + .build(); + } + + /** + * Translates {@link PortStatistics} to gRPC PortStatistics message. + * + * @param portStatistics {@link PortStatistics} + * @return gRPC PortStatistics message + */ + public static org.onosproject.grpc.Port.PortStatistics translate(PortStatistics portStatistics) { + // TODO implement adding missing fields + return org.onosproject.grpc.Port.PortStatistics.newBuilder() + .setPort(portStatistics.port()) + .setPacketsReceived(portStatistics.packetsReceived()) + .setPacketsSent(portStatistics.packetsSent()) + .build(); + } + + // may be this can be moved to Annotation itself or AnnotationsUtils + /** + * Converts Annotations to Map of Strings. + * + * @param annotations {@link Annotations} + * @return Map of annotation key and values + */ + public static Map asMap(Annotations annotations) { + if (annotations instanceof DefaultAnnotations) { + return ((DefaultAnnotations) annotations).asMap(); + } + Map map = new HashMap<>(); + annotations.keys() + .forEach(k -> map.put(k, annotations.value(k))); + + return map; + } + + // may be this can be moved to Annotation itself or AnnotationsUtils + /** + * Converts Map of Strings to {@link SparseAnnotations}. + * + * @param annotations Map of annotation key and values + * @return {@link SparseAnnotations} + */ + public static SparseAnnotations asAnnotations(Map annotations) { + DefaultAnnotations.Builder builder = DefaultAnnotations.builder(); + annotations.entrySet().forEach(e -> { + if (e.getValue() != null) { + builder.set(e.getKey(), e.getValue()); + } else { + builder.remove(e.getKey()); + } + }); + return builder.build(); + } + + // Utility class not intended for instantiation. + private GrpcDeviceUtils() {} +} diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java new file mode 100644 index 00000000..b419a346 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java @@ -0,0 +1,75 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; + +import org.onosproject.incubator.rpc.RemoteServiceContext; +import org.onosproject.net.device.DeviceProviderRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.MoreObjects; + +import io.grpc.ManagedChannel; + +// gRPC Client side +// Probably there should be plug-in mechanism in the future. +/** + * RemoteServiceContext based on gRPC. + * + *

+ * Currently it supports {@link DeviceProviderRegistry}. + */ +public class GrpcRemoteServiceContext implements RemoteServiceContext { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final Map, Object> services = new ConcurrentHashMap<>(); + + private final ManagedChannel channel; + + public GrpcRemoteServiceContext(ManagedChannel channel) { + this.channel = checkNotNull(channel); + services.put(DeviceProviderRegistry.class, new DeviceProviderRegistryClientProxy(channel)); + } + + + @Override + public T get(Class serviceClass) { + @SuppressWarnings("unchecked") + T service = (T) services.get(serviceClass); + if (service != null) { + return service; + } + log.error("{} not supported", serviceClass); + throw new NoSuchElementException(serviceClass.getTypeName() + " not supported"); + } + + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("services", services.keySet()) + .add("channel", channel.authority()) + .toString(); + } + +} diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java new file mode 100644 index 00000000..74962187 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java @@ -0,0 +1,119 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.net.URI; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.onosproject.incubator.rpc.RemoteServiceContext; +import org.onosproject.incubator.rpc.RemoteServiceContextProvider; +import org.onosproject.incubator.rpc.RemoteServiceContextProviderService; +import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry; +import org.onosproject.net.provider.ProviderId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.ManagedChannel; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; + + +// gRPC Client side +/** + * RemoteServiceContextProvider based on gRPC. + */ +@Component(immediate = true) +public class GrpcRemoteServiceProvider implements RemoteServiceContextProvider { + + public static final String GRPC_SCHEME = "grpc"; + + public static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc"; + + private static final ProviderId PID = new ProviderId(GRPC_SCHEME, RPC_PROVIDER_NAME); + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected RemoteServiceProviderRegistry rpcRegistry; + + private Map channels = new ConcurrentHashMap<>(); + + private RemoteServiceContextProviderService providerService; + + + @Activate + protected void activate() { + providerService = rpcRegistry.register(this); + + // FIXME remove me. test code to see if gRPC loads in karaf + //getChannel(URI.create("grpc://localhost:8080")); + + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + rpcRegistry.unregister(this); + + // shutdown all channels + channels.values().stream() + .forEach(ManagedChannel::shutdown); + // Should we wait for shutdown? How? + channels.clear(); + log.info("Stopped"); + } + + @Override + public ProviderId id() { + return PID; + } + + @Override + public RemoteServiceContext get(URI uri) { + // Create gRPC client + return new GrpcRemoteServiceContext(getChannel(uri)); + } + + private ManagedChannel getChannel(URI uri) { + checkArgument(Objects.equals(GRPC_SCHEME, uri.getScheme()), + "Invalid URI scheme: %s", uri.getScheme()); + + return channels.compute(uri, (u, ch) -> { + if (ch != null && !ch.isShutdown()) { + return ch; + } else { + return createChannel(u); + } + }); + } + + private ManagedChannel createChannel(URI uri) { + log.debug("Creating channel for {}", uri); + return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()) + .negotiationType(NegotiationType.PLAINTEXT) + .build(); + } + +} diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java new file mode 100644 index 00000000..4f43fa65 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java @@ -0,0 +1,385 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.stream.Collectors.toList; +import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate; +import static org.onosproject.net.DeviceId.deviceId; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Modified; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.onosproject.grpc.Device.DeviceConnected; +import org.onosproject.grpc.Device.DeviceDisconnected; +import org.onosproject.grpc.Device.DeviceProviderMsg; +import org.onosproject.grpc.Device.DeviceProviderServiceMsg; +import org.onosproject.grpc.Device.IsReachableResponse; +import org.onosproject.grpc.Device.PortStatusChanged; +import org.onosproject.grpc.Device.ReceivedRoleReply; +import org.onosproject.grpc.Device.RegisterProvider; +import org.onosproject.grpc.Device.UpdatePortStatistics; +import org.onosproject.grpc.Device.UpdatePorts; +import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc; +import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc; +import org.onosproject.net.DeviceId; +import org.onosproject.net.MastershipRole; +import org.onosproject.net.device.DeviceProvider; +import org.onosproject.net.device.DeviceProviderRegistry; +import org.onosproject.net.device.DeviceProviderService; +import org.onosproject.net.provider.ProviderId; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Sets; + +import io.grpc.Server; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.StreamObserver; + +// gRPC Server on Metro-side +// Translates request received on RPC channel, and calls corresponding Service on +// Metro-ONOS cluster. +/** + * Server side implementation of gRPC based RemoteService. + */ +@Component(immediate = true) +public class GrpcRemoteServiceServer { + + private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc"; + + // TODO pick a number + public static final int DEFAULT_LISTEN_PORT = 11984; + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceProviderRegistry deviceProviderRegistry; + + + @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT, + label = "Port to listen on") + protected int listenPort = DEFAULT_LISTEN_PORT; + + private Server server; + private final Set registeredProviders = Sets.newConcurrentHashSet(); + + @Activate + protected void activate(ComponentContext context) throws IOException { + modified(context); + + log.debug("Server starting on {}", listenPort); + try { + server = NettyServerBuilder.forPort(listenPort) + .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy())) + .build().start(); + } catch (IOException e) { + log.error("Failed to start gRPC server", e); + throw e; + } + + log.info("Started on {}", listenPort); + } + + @Deactivate + protected void deactivate() { + + registeredProviders.stream() + .forEach(deviceProviderRegistry::unregister); + + server.shutdown(); + // Should we wait for shutdown? + log.info("Stopped"); + } + + @Modified + public void modified(ComponentContext context) { + // TODO support dynamic reconfiguration and restarting server? + } + + // RPC Server-side code + // RPC session Factory + /** + * Relays DeviceProviderRegistry calls from RPC client. + */ + class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc { + + @Override + public StreamObserver register(StreamObserver toDeviceProvider) { + log.trace("DeviceProviderRegistryServerProxy#register called!"); + + DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider); + + return new DeviceProviderServiceServerProxy(provider, toDeviceProvider); + } + } + + // Lower -> Upper Controller message + // RPC Server-side code + // RPC session handler + private final class DeviceProviderServiceServerProxy + implements StreamObserver { + + // intentionally shadowing + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final DeviceProviderServerProxy pairedProvider; + private final StreamObserver toDeviceProvider; + + private final Cache> outstandingIsReachable; + + // wrapped providerService + private DeviceProviderService deviceProviderService; + + + DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider, + StreamObserver toDeviceProvider) { + this.pairedProvider = provider; + this.toDeviceProvider = toDeviceProvider; + outstandingIsReachable = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(); + + // pair RPC session in other direction + provider.pair(this); + } + + @Override + public void onNext(DeviceProviderServiceMsg msg) { + try { + log.trace("DeviceProviderServiceServerProxy received: {}", msg); + onMethod(msg); + } catch (Exception e) { + log.error("Exception thrown handling {}", msg, e); + onError(e); + throw e; + } + } + + /** + * Translates received RPC message to {@link DeviceProviderService} method calls. + * @param msg DeviceProviderService message + */ + private void onMethod(DeviceProviderServiceMsg msg) { + switch (msg.getMethodCase()) { + case REGISTER_PROVIDER: + RegisterProvider registerProvider = msg.getRegisterProvider(); + // TODO Do we care about provider name? + pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME)); + registeredProviders.add(pairedProvider); + deviceProviderService = deviceProviderRegistry.register(pairedProvider); + break; + + case DEVICE_CONNECTED: + DeviceConnected deviceConnected = msg.getDeviceConnected(); + deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()), + translate(deviceConnected.getDeviceDescription())); + break; + case DEVICE_DISCONNECTED: + DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected(); + deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId())); + break; + case UPDATE_PORTS: + UpdatePorts updatePorts = msg.getUpdatePorts(); + deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()), + updatePorts.getPortDescriptionsList() + .stream() + .map(GrpcDeviceUtils::translate) + .collect(toList())); + break; + case PORT_STATUS_CHANGED: + PortStatusChanged portStatusChanged = msg.getPortStatusChanged(); + deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()), + translate(portStatusChanged.getPortDescription())); + break; + case RECEIVED_ROLE_REPLY: + ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply(); + deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()), + translate(receivedRoleReply.getRequested()), + translate(receivedRoleReply.getResponse())); + break; + case UPDATE_PORT_STATISTICS: + UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics(); + deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()), + updatePortStatistics.getPortStatisticsList() + .stream() + .map(GrpcDeviceUtils::translate) + .collect(toList())); + break; + + // return value of DeviceProvider#isReachable + case IS_REACHABLE_RESPONSE: + IsReachableResponse isReachableResponse = msg.getIsReachableResponse(); + int xid = isReachableResponse.getXid(); + boolean isReachable = isReachableResponse.getIsReachable(); + CompletableFuture result = outstandingIsReachable.asMap().remove(xid); + if (result != null) { + result.complete(isReachable); + } + break; + + case METHOD_NOT_SET: + default: + log.warn("Unexpected message received {}", msg); + break; + } + } + + @Override + public void onCompleted() { + log.info("DeviceProviderServiceServerProxy completed"); + deviceProviderRegistry.unregister(pairedProvider); + registeredProviders.remove(pairedProvider); + toDeviceProvider.onCompleted(); + } + + @Override + public void onError(Throwable e) { + log.error("DeviceProviderServiceServerProxy#onError", e); + deviceProviderRegistry.unregister(pairedProvider); + registeredProviders.remove(pairedProvider); + // TODO What is the proper clean up for bi-di stream on error? + // sample suggests no-op + toDeviceProvider.onError(e); + } + + + /** + * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value. + * @param xid IsReachable call ID. + * @param reply Future to + */ + void register(int xid, CompletableFuture reply) { + outstandingIsReachable.put(xid, reply); + } + + } + + // Upper -> Lower Controller message + /** + * Relay DeviceProvider calls to RPC client. + */ + private final class DeviceProviderServerProxy + implements DeviceProvider { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + // xid for isReachable calls + private final AtomicInteger xidPool = new AtomicInteger(); + private final StreamObserver toDeviceProvider; + + private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null; + private ProviderId providerId; + + DeviceProviderServerProxy(StreamObserver toDeviceProvider) { + this.toDeviceProvider = toDeviceProvider; + } + + void setProviderId(ProviderId pid) { + this.providerId = pid; + } + + /** + * Registers RPC stream in other direction. + * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy} + */ + void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) { + this.deviceProviderServiceProxy = deviceProviderServiceProxy; + } + + @Override + public void triggerProbe(DeviceId deviceId) { + log.trace("triggerProbe({})", deviceId); + DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); + msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder() + .setDeviceId(deviceId.toString()) + .build()); + DeviceProviderMsg triggerProbeMsg = msgBuilder.build(); + toDeviceProvider.onNext(triggerProbeMsg); + // TODO Catch Exceptions and call onError() + } + + @Override + public void roleChanged(DeviceId deviceId, MastershipRole newRole) { + log.trace("roleChanged({}, {})", deviceId, newRole); + DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); + msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder() + .setDeviceId(deviceId.toString()) + .setNewRole(translate(newRole)) + .build()); + toDeviceProvider.onNext(msgBuilder.build()); + // TODO Catch Exceptions and call onError() + } + + @Override + public boolean isReachable(DeviceId deviceId) { + log.trace("isReachable({})", deviceId); + CompletableFuture result = new CompletableFuture<>(); + final int xid = xidPool.incrementAndGet(); + + DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); + msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder() + .setXid(xid) + .setDeviceId(deviceId.toString()) + .build()); + + // Associate xid and register above future some where + // in DeviceProviderService channel to receive reply + if (deviceProviderServiceProxy != null) { + deviceProviderServiceProxy.register(xid, result); + } + + // send message down RPC + toDeviceProvider.onNext(msgBuilder.build()); + + // wait for reply + try { + return result.get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.debug("isReachable({}) was Interrupted", deviceId, e); + Thread.currentThread().interrupt(); + } catch (TimeoutException e) { + log.warn("isReachable({}) Timed out", deviceId, e); + } catch (ExecutionException e) { + log.error("isReachable({}) Execution failed", deviceId, e); + // close session? + } + return false; + // TODO Catch Exceptions and call onError() + } + + @Override + public ProviderId id() { + return checkNotNull(providerId, "not initialized yet"); + } + + } +} diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java new file mode 100644 index 00000000..d667ea77 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * gRPC based RemoteServiceProvider implementation. + */ +package org.onosproject.incubator.rpc.grpc; diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/proto/Device.proto b/framework/src/onos/incubator/rpc-grpc/src/main/proto/Device.proto new file mode 100644 index 00000000..aae46d96 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/proto/Device.proto @@ -0,0 +1,131 @@ +syntax = "proto3"; +option java_package = "org.onosproject.grpc"; + +import "Port.proto"; +package Device; + +enum DeviceType { + OTHER = 0; + SWITCH = 1; + ROUTER = 2; + ROADM = 3; + OTN = 4; + ROADM_OTN = 5; + FIREWALL = 6; + BALANCER = 7; + IPS = 8; + IDS = 9; + CONTROLLER = 10; + VIRTUAL = 11; + FIBER_SWITCH = 12; + MICROWAVE = 13; +} + +message DeviceDescription { + string device_Uri = 1; + DeviceType type = 2; + string manufacturer = 3; + string hw_version = 4; + string sw_version = 5; + string serial_number = 6; + string chassis_id = 7; + map annotations = 8; +} + +enum MastershipRole { + NONE = 0; + MASTER = 1; + STANDBY = 2; +} + +message DeviceConnected { + // DeviceID as String DeviceId#toString + string device_id = 1; + DeviceDescription device_description = 2; +} + +message DeviceDisconnected { + // DeviceID as String DeviceId#toString + string device_id = 1; +} + +message UpdatePorts { + // DeviceID as String DeviceId#toString + string device_id = 1; + repeated Port.PortDescription port_descriptions= 2; +} + +message PortStatusChanged { + // DeviceID as String DeviceId#toString + string device_id = 1; + Port.PortDescription port_description= 2; +} + +message ReceivedRoleReply { + // DeviceID as String DeviceId#toString + string device_id = 1; + MastershipRole requested = 2; + MastershipRole response = 3; +} + +message UpdatePortStatistics { + // DeviceID as String DeviceId#toString + string device_id = 1; + repeated Port.PortStatistics port_statistics = 2; +} + +message RegisterProvider { + // DeviceProvider's ProviderId scheme + string provider_scheme = 1; +} + +message DeviceProviderServiceMsg { + oneof method { + DeviceConnected device_connected= 1; + DeviceDisconnected device_disconnected = 2; + UpdatePorts update_ports= 3; + PortStatusChanged port_status_changed = 4; + ReceivedRoleReply received_role_reply = 5; + UpdatePortStatistics update_port_statistics = 6; + + // This message is for return value of DeviceProvider#isReachable + IsReachableResponse is_reachable_response = 7; + + // This MUST be the 1st message over the stream + RegisterProvider register_provider = 8; + } +} + +message TriggerProbe { + // DeviceID as String DeviceId#toString + string device_id = 1; +} + +message RoleChanged { + // DeviceID as String DeviceId#toString + string device_id = 1; + MastershipRole new_role = 2; +} + +message IsReachableRequest { + int32 xid = 1; + // DeviceID as String DeviceId#toString + string device_id = 2; +} + +message IsReachableResponse { + int32 xid = 1; + bool is_reachable = 2; +} + +message DeviceProviderMsg { + oneof method { + TriggerProbe trigger_probe = 1; + RoleChanged role_changed = 2; + IsReachableRequest is_reachable_request= 3; + } +} + +service DeviceProviderRegistryRpc { + rpc Register(stream DeviceProviderServiceMsg) returns (stream DeviceProviderMsg); +} diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/proto/Port.proto b/framework/src/onos/incubator/rpc-grpc/src/main/proto/Port.proto new file mode 100644 index 00000000..f32193cc --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/main/proto/Port.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; +option java_package = "org.onosproject.grpc"; + +package Port; + +enum PortType { + // Signifies copper-based connectivity. + COPPER = 0; + // Signifies optical fiber-based connectivity. + FIBER = 1; + // Signifies optical fiber-based packet port. + PACKET = 2; + // Signifies optical fiber-based optical tributary port (called T-port). + //The signal from the client side will be formed into a ITU G.709 (OTN) frame. + ODUCLT = 3; + // Signifies optical fiber-based Line-side port (called L-port). + OCH = 4; + // Signifies optical fiber-based WDM port (called W-port). + //Optical Multiplexing Section (See ITU G.709). + OMS = 5; + // Signifies virtual port. + VIRTUAL = 6; +} + +// TODO What are we going to do with more specific PortDescription ... +message PortDescription { + // PortNumber as String PortNumber#toString + string port_number = 1; + bool is_enabled = 2; + PortType type = 3; + int64 port_speed = 4; + map annotations = 8; +} + +message PortStatistics { + int32 port = 1; + int64 packets_received = 2; + int64 packets_sent = 3; + // TODO add all other fields +} diff --git a/framework/src/onos/incubator/rpc-grpc/src/test/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceTest.java b/framework/src/onos/incubator/rpc-grpc/src/test/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceTest.java new file mode 100644 index 00000000..69db5714 --- /dev/null +++ b/framework/src/onos/incubator/rpc-grpc/src/test/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceTest.java @@ -0,0 +1,398 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.rpc.grpc; + +import static org.junit.Assert.*; +import static org.onosproject.net.DeviceId.deviceId; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.packet.ChassisId; +import org.onosproject.incubator.rpc.RemoteServiceContext; +import org.onosproject.incubator.rpc.RemoteServiceContextProvider; +import org.onosproject.incubator.rpc.RemoteServiceContextProviderService; +import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.Device.Type; +import org.onosproject.net.DeviceId; +import org.onosproject.net.MastershipRole; +import org.onosproject.net.PortNumber; +import org.onosproject.net.SparseAnnotations; +import org.onosproject.net.device.DefaultDeviceDescription; +import org.onosproject.net.device.DefaultPortDescription; +import org.onosproject.net.device.DeviceDescription; +import org.onosproject.net.device.DeviceProvider; +import org.onosproject.net.device.DeviceProviderRegistry; +import org.onosproject.net.device.DeviceProviderService; +import org.onosproject.net.device.PortDescription; +import org.onosproject.net.device.PortStatistics; +import org.onosproject.net.provider.AbstractProviderRegistry; +import org.onosproject.net.provider.AbstractProviderService; +import org.onosproject.net.provider.ProviderId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; + +/** + * Set of tests of the gRPC RemoteService components. + */ +public class GrpcRemoteServiceTest { + + private static final DeviceId DEVICE_ID = deviceId("dev:000001"); + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final ProviderId PID = new ProviderId("test", "com.exmaple.test"); + + private static final URI DURI = URI.create("dev:000001"); + + private static final String MFR = "mfr"; + + private static final String HW = "hw"; + + private static final String SW = "sw"; + + private static final String SN = "serial"; + + private static final ChassisId CHASSIS = new ChassisId(42); + + private static final SparseAnnotations ANON = DefaultAnnotations.builder() + .set("foo", "var") + .build(); + + private static final PortNumber PORT = PortNumber.portNumber(99); + + private static final DeviceDescription DDESC + = new DefaultDeviceDescription(DURI, Type.SWITCH, MFR, HW, SW, SN, + CHASSIS, ANON); + + private GrpcRemoteServiceServer server; + private GrpcRemoteServiceProvider client; + + private DeviceProvider svSideDeviceProvider; + + private MTestDeviceProviderService svDeviceProviderService; + + private CountDownLatch serverReady; + + private URI uri; + + public static int pickListenPort() { + try { + // pick unused port + ServerSocket socket = new ServerSocket(0); + int port = socket.getLocalPort(); + socket.close(); + return port; + } catch (IOException e) { + // something went wrong, try picking randomly + return RandomUtils.nextInt(49152, 0xFFFF + 1); + } + } + + @Before + public void setUp() throws Exception { + serverReady = new CountDownLatch(1); + server = new GrpcRemoteServiceServer(); + server.deviceProviderRegistry = new MTestDeviceProviderRegistry(); + // todo: pass proper ComponentContext + server.listenPort = pickListenPort(); + uri = URI.create("grpc://localhost:" + server.listenPort); + server.activate(null); + + client = new GrpcRemoteServiceProvider(); + client.rpcRegistry = new NoOpRemoteServiceProviderRegistry(); + client.activate(); + } + + @After + public void tearDown() { + client.deactivate(); + server.deactivate(); + } + + private static void assertEqualsButNotSame(Object expected, Object actual) { + assertEquals(expected, actual); + assertNotSame("Cannot be same instance if it properly went through gRPC", + expected, actual); + } + + @Test + public void basics() throws InterruptedException { + RemoteServiceContext remoteServiceContext = client.get(uri); + assertNotNull(remoteServiceContext); + + DeviceProviderRegistry deviceProviderRegistry = remoteServiceContext.get(DeviceProviderRegistry.class); + assertNotNull(deviceProviderRegistry); + + CTestDeviceProvider clDeviceProvider = new CTestDeviceProvider(); + DeviceProviderService clDeviceProviderService = deviceProviderRegistry.register(clDeviceProvider); + + assertTrue(serverReady.await(10, TimeUnit.SECONDS)); + + // client to server communication + clDeviceProviderService.deviceConnected(DEVICE_ID, DDESC); + assertTrue(svDeviceProviderService.deviceConnected.await(10, TimeUnit.SECONDS)); + assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.deviceConnectedDid); + assertEqualsButNotSame(DDESC, svDeviceProviderService.deviceConnectedDesc); + + PortDescription portDescription = new DefaultPortDescription(PORT, true, ANON); + List portDescriptions = ImmutableList.of(portDescription); + clDeviceProviderService.updatePorts(DEVICE_ID, portDescriptions); + assertTrue(svDeviceProviderService.updatePorts.await(10, TimeUnit.SECONDS)); + assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.updatePortsDid); + assertEqualsButNotSame(portDescriptions, svDeviceProviderService.updatePortsDescs); + + MastershipRole cRole = MastershipRole.MASTER; + MastershipRole dRole = MastershipRole.STANDBY; + clDeviceProviderService.receivedRoleReply(DEVICE_ID, cRole, dRole); + assertTrue(svDeviceProviderService.receivedRoleReply.await(10, TimeUnit.SECONDS)); + assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.receivedRoleReplyDid); + assertEquals(cRole, svDeviceProviderService.receivedRoleReplyRequested); + assertEquals(dRole, svDeviceProviderService.receivedRoleReplyResponse); + + clDeviceProviderService.portStatusChanged(DEVICE_ID, portDescription); + assertTrue(svDeviceProviderService.portStatusChanged.await(10, TimeUnit.SECONDS)); + assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.portStatusChangedDid); + assertEqualsButNotSame(portDescription, svDeviceProviderService.portStatusChangedDesc); + + Collection portStatistics = Collections.emptyList(); + clDeviceProviderService.updatePortStatistics(DEVICE_ID, portStatistics); + assertTrue(svDeviceProviderService.updatePortStatistics.await(10, TimeUnit.SECONDS)); + assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.updatePortStatisticsDid); + assertEqualsButNotSame(portStatistics, svDeviceProviderService.updatePortStatisticsStats); + + clDeviceProviderService.deviceDisconnected(DEVICE_ID); + assertTrue(svDeviceProviderService.deviceDisconnected.await(10, TimeUnit.SECONDS)); + assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.deviceDisconnectedDid); + + + + // server to client communication + svSideDeviceProvider.triggerProbe(DEVICE_ID); + assertTrue(clDeviceProvider.triggerProbe.await(10, TimeUnit.SECONDS)); + assertEquals(DEVICE_ID, clDeviceProvider.triggerProbeDid); + assertNotSame("Cannot be same instance if it properly went through gRPC", + DEVICE_ID, clDeviceProvider.triggerProbeDid); + + svSideDeviceProvider.roleChanged(DEVICE_ID, MastershipRole.STANDBY); + assertTrue(clDeviceProvider.roleChanged.await(10, TimeUnit.SECONDS)); + assertEquals(DEVICE_ID, clDeviceProvider.roleChangedDid); + assertNotSame("Cannot be same instance if it properly went through gRPC", + DEVICE_ID, clDeviceProvider.roleChangedDid); + assertEquals(MastershipRole.STANDBY, clDeviceProvider.roleChangedNewRole); + + clDeviceProvider.isReachableReply = false; + assertEquals(clDeviceProvider.isReachableReply, + svSideDeviceProvider.isReachable(DEVICE_ID)); + assertTrue(clDeviceProvider.isReachable.await(10, TimeUnit.SECONDS)); + assertEquals(DEVICE_ID, clDeviceProvider.isReachableDid); + assertNotSame("Cannot be same instance if it properly went through gRPC", + DEVICE_ID, clDeviceProvider.isReachableDid); + } + + /** + * Device Provider on CO side. + */ + public class CTestDeviceProvider implements DeviceProvider { + + final CountDownLatch triggerProbe = new CountDownLatch(1); + DeviceId triggerProbeDid; + + final CountDownLatch roleChanged = new CountDownLatch(1); + DeviceId roleChangedDid; + MastershipRole roleChangedNewRole; + + final CountDownLatch isReachable = new CountDownLatch(1); + DeviceId isReachableDid; + boolean isReachableReply = false; + + @Override + public ProviderId id() { + return PID; + } + + @Override + public void triggerProbe(DeviceId deviceId) { + log.info("triggerProbe({}) on Client called", deviceId); + triggerProbeDid = deviceId; + triggerProbe.countDown(); + } + + @Override + public void roleChanged(DeviceId deviceId, MastershipRole newRole) { + log.info("roleChanged({},{}) on Client called", deviceId, newRole); + roleChangedDid = deviceId; + roleChangedNewRole = newRole; + roleChanged.countDown(); + } + + @Override + public boolean isReachable(DeviceId deviceId) { + log.info("isReachable({}) on Client called", deviceId); + isReachableDid = deviceId; + isReachable.countDown(); + return isReachableReply; + } + + } + + class NoOpRemoteServiceProviderRegistry + implements RemoteServiceProviderRegistry { + + @Override + public RemoteServiceContextProviderService register(RemoteServiceContextProvider provider) { + return new RemoteServiceContextProviderService() { + + @Override + public RemoteServiceContextProvider provider() { + return provider; + } + }; + } + + @Override + public void unregister(RemoteServiceContextProvider provider) { + } + + @Override + public Set getProviders() { + return Collections.emptySet(); + } + } + + /** + * DeviceProvider on Metro side. + */ + public class MTestDeviceProviderRegistry + extends AbstractProviderRegistry + implements DeviceProviderRegistry { + + @Override + protected DeviceProviderService createProviderService(DeviceProvider provider) { + log.info("createProviderService({})", provider); + svSideDeviceProvider = provider; + svDeviceProviderService = new MTestDeviceProviderService(provider); + serverReady.countDown(); + return svDeviceProviderService; + } + + } + + private final class MTestDeviceProviderService + extends AbstractProviderService + implements DeviceProviderService { + + public MTestDeviceProviderService(DeviceProvider provider) { + super(provider); + } + + + final CountDownLatch deviceConnected = new CountDownLatch(1); + DeviceId deviceConnectedDid; + DeviceDescription deviceConnectedDesc; + + @Override + public void deviceConnected(DeviceId deviceId, + DeviceDescription deviceDescription) { + log.info("deviceConnected({}, {}) on Server called", deviceId, deviceDescription); + deviceConnectedDid = deviceId; + deviceConnectedDesc = deviceDescription; + deviceConnected.countDown(); + } + + + final CountDownLatch updatePorts = new CountDownLatch(1); + DeviceId updatePortsDid; + List updatePortsDescs; + + @Override + public void updatePorts(DeviceId deviceId, + List portDescriptions) { + log.info("updatePorts({}, {}) on Server called", deviceId, portDescriptions); + updatePortsDid = deviceId; + updatePortsDescs = portDescriptions; + updatePorts.countDown(); + } + + final CountDownLatch receivedRoleReply = new CountDownLatch(1); + DeviceId receivedRoleReplyDid; + MastershipRole receivedRoleReplyRequested; + MastershipRole receivedRoleReplyResponse; + + @Override + public void receivedRoleReply(DeviceId deviceId, MastershipRole requested, + MastershipRole response) { + log.info("receivedRoleReply({}, {}, {}) on Server called", deviceId, requested, response); + receivedRoleReplyDid = deviceId; + receivedRoleReplyRequested = requested; + receivedRoleReplyResponse = response; + receivedRoleReply.countDown(); + } + + final CountDownLatch portStatusChanged = new CountDownLatch(1); + DeviceId portStatusChangedDid; + PortDescription portStatusChangedDesc; + + + @Override + public void portStatusChanged(DeviceId deviceId, + PortDescription portDescription) { + log.info("portStatusChanged({}, {}) on Server called", deviceId, portDescription); + portStatusChangedDid = deviceId; + portStatusChangedDesc = portDescription; + portStatusChanged.countDown(); + } + + final CountDownLatch updatePortStatistics = new CountDownLatch(1); + DeviceId updatePortStatisticsDid; + Collection updatePortStatisticsStats; + + + @Override + public void updatePortStatistics(DeviceId deviceId, + Collection portStatistics) { + log.info("updatePortStatistics({}, {}) on Server called", deviceId, portStatistics); + updatePortStatisticsDid = deviceId; + updatePortStatisticsStats = portStatistics; + updatePortStatistics.countDown(); + } + + final CountDownLatch deviceDisconnected = new CountDownLatch(1); + DeviceId deviceDisconnectedDid; + + @Override + public void deviceDisconnected(DeviceId deviceId) { + log.info("deviceDisconnected({}) on Server called", deviceId); + deviceDisconnectedDid = deviceId; + deviceDisconnected.countDown(); + } + } + +} -- cgit 1.2.3-korg