aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/rpc-grpc
diff options
context:
space:
mode:
authorAshlee Young <ashlee@wildernessvoice.com>2015-12-01 05:49:27 -0800
committerAshlee Young <ashlee@wildernessvoice.com>2015-12-01 05:49:27 -0800
commite63291850fd0795c5700e25e67e5dee89ba54c5f (patch)
tree9707289536ad95bb739c9856761ad43275e07d8c /framework/src/onos/incubator/rpc-grpc
parent671823e12bc13be9a8b87a5d7de33da1bb7a44e8 (diff)
onos commit hash c2999f30c69e50df905a9d175ef80b3f23a98514
Change-Id: I2bb8562c4942b6d6a6d60b663db2e17540477b81 Signed-off-by: Ashlee Young <ashlee@wildernessvoice.com>
Diffstat (limited to 'framework/src/onos/incubator/rpc-grpc')
-rw-r--r--framework/src/onos/incubator/rpc-grpc/features.xml37
-rw-r--r--framework/src/onos/incubator/rpc-grpc/pom.xml269
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java77
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java295
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java381
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java75
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java119
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java385
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java20
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/proto/Device.proto131
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/main/proto/Port.proto40
-rw-r--r--framework/src/onos/incubator/rpc-grpc/src/test/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceTest.java398
12 files changed, 2227 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/rpc-grpc/features.xml b/framework/src/onos/incubator/rpc-grpc/features.xml
new file mode 100644
index 00000000..df768fbb
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/features.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Copyright 2015 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
+ <feature name="${project.artifactId}" version="${project.version}"
+ description="${project.description}">
+ <feature>onos-api</feature>
+ <bundle>mvn:com.google.protobuf/protobuf-java/3.0.0-beta-1</bundle>
+ <bundle>mvn:io.netty/netty-common/4.1.0.Beta6</bundle>
+ <bundle>mvn:io.netty/netty-buffer/4.1.0.Beta6</bundle>
+ <bundle>mvn:io.netty/netty-transport/4.1.0.Beta6</bundle>
+ <bundle>mvn:io.netty/netty-handler/4.1.0.Beta6</bundle>
+ <bundle>mvn:io.netty/netty-codec/4.1.0.Beta6</bundle>
+ <bundle>mvn:io.netty/netty-codec-http/4.1.0.Beta6</bundle>
+ <bundle>mvn:io.netty/netty-codec-http2/4.1.0.Beta6</bundle>
+ <bundle>mvn:io.netty/netty-resolver/4.1.0.Beta6</bundle>
+ <bundle>mvn:com.twitter/hpack/0.11.0</bundle>
+ <!-- TODO: Create shaded jar for these. -->
+ <bundle>wrap:mvn:com.google.auth/google-auth-library-credentials/0.3.0$Bundle-SymbolicName=com.google.auth.google-auth-library-credentials&amp;Bundle-Version=0.3.0</bundle>
+ <bundle>wrap:mvn:com.google.auth/google-auth-library-oauth2-http/0.3.0$Bundle-SymbolicName=com.google.auth.google-auth-library-oauth2-http&amp;Bundle-Version=0.3.0</bundle>
+ <bundle>wrap:mvn:io.grpc/grpc-all/0.9.0$Bundle-SymbolicName=io.grpc.grpc-all&amp;Bundle-Version=0.9.0&amp;Import-Package=io.netty.*;version=4.1.0.Beta6,javax.net.ssl,com.google.protobuf.nano;resolution:=optional,okio;resolution:=optional,*</bundle>
+ <bundle>mvn:${project.groupId}/${project.artifactId}/${project.version}</bundle>
+ </feature>
+</features>
diff --git a/framework/src/onos/incubator/rpc-grpc/pom.xml b/framework/src/onos/incubator/rpc-grpc/pom.xml
new file mode 100644
index 00000000..f528ca53
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/pom.xml
@@ -0,0 +1,269 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2015 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>onos-incubator</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>onos-incubator-rpc-grpc</artifactId>
+ <packaging>bundle</packaging>
+
+ <description>ONOS inter-cluster RPC based on gRPC</description>
+ <url>http://onosproject.org</url>
+
+ <properties>
+ <onos.app.name>org.onosproject.incubator.rpc.grpc</onos.app.name>
+ <onos.app.requires>org.onosproject.incubator.rpc</onos.app.requires>
+ <!-- Note: update feature.xml when updating -->
+ <grpc.version>0.9.0</grpc.version>
+ <grpc.netty.version>4.1.0.Beta6</grpc.netty.version>
+ </properties>
+
+ <pluginRepositories>
+ <pluginRepository>
+ <id>protoc-plugin</id>
+ <url>https://dl.bintray.com/sergei-ivanov/maven/</url>
+ </pluginRepository>
+ </pluginRepositories>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-osgi</artifactId>
+ </dependency>
+<!--
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-all</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+-->
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-auth</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.4.0.Final</version>
+ </extension>
+ </extensions>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-scr-srcdescriptor</id>
+ <goals>
+ <goal>scr</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <!-- avoid searching into wrong source path -->
+ <scanClasses>true</scanClasses>
+ <supportedProjectTypes>
+ <supportedProjectType>bundle</supportedProjectType>
+ <supportedProjectType>war</supportedProjectType>
+ </supportedProjectTypes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>cfg</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>cfg</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>swagger</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>swagger</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>app</id>
+ <phase>package</phase>
+ <goals>
+ <goal>app</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>com.google.protobuf.tools</groupId>
+ <artifactId>maven-protoc-plugin</artifactId>
+ <version>0.4.2</version>
+ <configuration>
+ <!-- The version of protoc must match protobuf-java. If you don't
+ depend on protobuf-java directly, you will be transitively depending on the
+ protobuf-java version that grpc depends on. -->
+ <protocArtifact>com.google.protobuf:protoc:3.0.0-beta-1:exe:${os.detected.classifier}</protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9.1</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/protobuf/java</source>
+ <source>${project.build.directory}/generated-sources/protobuf/grpc-java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <!-- gRPC requires more recent version of netty -->
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ <version>${grpc.netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ <version>${grpc.netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${grpc.netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${grpc.netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>${grpc.netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>hpack</artifactId>
+ <!-- 0.11.0 and later are published as a bundle -->
+ <version>0.11.0</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+</project>
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java
new file mode 100644
index 00000000..cad0fbb6
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderRegistryClientProxy.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import java.util.Map;
+
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.provider.AbstractProviderRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import io.grpc.Channel;
+import io.grpc.ManagedChannel;
+
+// gRPC Client side
+/**
+ * Proxy object to handle DeviceProviderRegistry calls.
+ *
+ * RPC wise, this will start/stop bidirectional streaming service sessions.
+ */
+final class DeviceProviderRegistryClientProxy
+ extends AbstractProviderRegistry<DeviceProvider, DeviceProviderService>
+ implements DeviceProviderRegistry {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final Channel channel;
+
+ private final Map<DeviceProvider, DeviceProviderServiceClientProxy> pServices;
+
+ DeviceProviderRegistryClientProxy(ManagedChannel channel) {
+ this.channel = channel;
+ pServices = Maps.newIdentityHashMap();
+ }
+
+ @Override
+ protected synchronized DeviceProviderService createProviderService(DeviceProvider provider) {
+
+ // Create session
+ DeviceProviderServiceClientProxy pService = new DeviceProviderServiceClientProxy(provider, channel);
+ log.debug("Created DeviceProviderServiceClientProxy", pService);
+
+ DeviceProviderServiceClientProxy old = pServices.put(provider, pService);
+ if (old != null) {
+ // sanity check, can go away
+ log.warn("Duplicate registration detected for {}", provider.id());
+ }
+ return pService;
+ }
+
+ @Override
+ public synchronized void unregister(DeviceProvider provider) {
+ DeviceProviderServiceClientProxy pService = pServices.remove(provider);
+ log.debug("Unregistering DeviceProviderServiceClientProxy", pService);
+ super.unregister(provider);
+ if (pService != null) {
+ pService.shutdown();
+ }
+ }
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java
new file mode 100644
index 00000000..498011f2
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/DeviceProviderServiceClientProxy.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.stream.Collectors.toList;
+import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
+import static org.onosproject.net.DeviceId.deviceId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.onosproject.grpc.Device.DeviceProviderMsg;
+import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
+import org.onosproject.grpc.Device.IsReachableRequest;
+import org.onosproject.grpc.Device.RoleChanged;
+import org.onosproject.grpc.Device.TriggerProbe;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.provider.AbstractProviderService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+
+import io.grpc.Channel;
+import io.grpc.stub.StreamObserver;
+
+// gRPC Client side
+// gRPC wise, this object represents bidirectional streaming service session
+// and deals with outgoing message stream
+/**
+ * DeviceProviderService instance associated with given DeviceProvider.
+ */
+final class DeviceProviderServiceClientProxy
+ extends AbstractProviderService<DeviceProvider>
+ implements DeviceProviderService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final StreamObserver<DeviceProviderServiceMsg> devProvService;
+ private final AtomicBoolean hasShutdown = new AtomicBoolean(false);
+
+ private final Channel channel;
+
+ DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) {
+ super(provider);
+ this.channel = channel;
+
+ DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel);
+ log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority());
+ devProvService = stub.register(new DeviceProviderClientProxy(provider));
+
+ // send initialize message
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setRegisterProvider(builder.getRegisterProviderBuilder()
+ .setProviderScheme(provider.id().scheme())
+ .build());
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void deviceConnected(DeviceId deviceId,
+ DeviceDescription deviceDescription) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setDeviceConnected(builder.getDeviceConnectedBuilder()
+ .setDeviceId(deviceId.toString())
+ .setDeviceDescription(translate(deviceDescription))
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void deviceDisconnected(DeviceId deviceId) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder()
+ .setDeviceId(deviceId.toString())
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void updatePorts(DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ List<org.onosproject.grpc.Port.PortDescription> portDescs =
+ portDescriptions.stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList());
+
+ builder.setUpdatePorts(builder.getUpdatePortsBuilder()
+ .setDeviceId(deviceId.toString())
+ .addAllPortDescriptions(portDescs)
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void portStatusChanged(DeviceId deviceId,
+ PortDescription portDescription) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setPortStatusChanged(builder.getPortStatusChangedBuilder()
+ .setDeviceId(deviceId.toString())
+ .setPortDescription(translate(portDescription))
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
+ MastershipRole response) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder()
+ .setDeviceId(deviceId.toString())
+ .setRequested(translate(requested))
+ .setResponse(translate(response))
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ @Override
+ public void updatePortStatistics(DeviceId deviceId,
+ Collection<PortStatistics> portStatistics) {
+ checkValidity();
+
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ List<org.onosproject.grpc.Port.PortStatistics> portStats =
+ portStatistics.stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList());
+ builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder()
+ .setDeviceId(deviceId.toString())
+ .addAllPortStatistics(portStats)
+ .build());
+
+ devProvService.onNext(builder.build());
+ }
+
+ /**
+ * Shutdown this session.
+ */
+ public void shutdown() {
+ if (hasShutdown.compareAndSet(false, true)) {
+ log.info("Shutting down session over {}", channel.authority());
+ // initiate clean shutdown from client
+ devProvService.onCompleted();
+ invalidate();
+ }
+ }
+
+ /**
+ * Abnormally terminate this session.
+ * @param t error details
+ */
+ public void shutdown(Throwable t) {
+ if (hasShutdown.compareAndSet(false, true)) {
+ log.error("Shutting down session over {}", channel.authority());
+ // initiate abnormal termination from client
+ devProvService.onError(t);
+ invalidate();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("channel", channel.authority())
+ .add("hasShutdown", hasShutdown.get())
+ .toString();
+ }
+
+ // gRPC wise, this object handles incoming message stream
+ /**
+ * Translates DeviceProvider instructions received from RPC to Java calls.
+ */
+ private final class DeviceProviderClientProxy
+ implements StreamObserver<DeviceProviderMsg> {
+
+ private final DeviceProvider provider;
+
+ DeviceProviderClientProxy(DeviceProvider provider) {
+ this.provider = checkNotNull(provider);
+ }
+
+ @Override
+ public void onNext(DeviceProviderMsg msg) {
+ try {
+ log.trace("DeviceProviderClientProxy received: {}", msg);
+ onMethod(msg);
+ } catch (Exception e) {
+ log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e);
+ // initiate shutdown from client
+ shutdown(e);
+ }
+ }
+
+ /**
+ * Translates received RPC message to {@link DeviceProvider} method calls.
+ * @param msg DeviceProvider message
+ */
+ private void onMethod(DeviceProviderMsg msg) {
+ switch (msg.getMethodCase()) {
+ case TRIGGER_PROBE:
+ TriggerProbe triggerProbe = msg.getTriggerProbe();
+ provider.triggerProbe(deviceId(triggerProbe.getDeviceId()));
+ break;
+ case ROLE_CHANGED:
+ RoleChanged roleChanged = msg.getRoleChanged();
+ provider.roleChanged(deviceId(roleChanged.getDeviceId()),
+ translate(roleChanged.getNewRole()));
+ break;
+ case IS_REACHABLE_REQUEST:
+ IsReachableRequest isReachableRequest = msg.getIsReachableRequest();
+ // check if reachable
+ boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId()));
+
+ int xid = isReachableRequest.getXid();
+ // send response back DeviceProviderService channel
+ DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
+ builder.setIsReachableResponse(builder.getIsReachableResponseBuilder()
+ .setXid(xid)
+ .setIsReachable(reachable)
+ .build());
+ devProvService.onNext(builder.build());
+ break;
+
+ case METHOD_NOT_SET:
+ default:
+ log.warn("Unexpected method, ignoring", msg);
+ break;
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("DeviceProviderClientProxy completed");
+ // session terminated from remote
+ // TODO unregister...? how?
+
+ //devProvService.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("DeviceProviderClientProxy#onError", t);
+ // session terminated from remote
+ // TODO unregister...? how?
+ //devProvService.onError(t);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("channel", channel.authority())
+ .toString();
+ }
+ }
+}
+
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java
new file mode 100644
index 00000000..7045b0c2
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcDeviceUtils.java
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onlab.packet.ChassisId;
+import org.onosproject.grpc.Device.DeviceType;
+import org.onosproject.grpc.Port.PortType;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.Port;
+import org.onosproject.net.Port.Type;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DefaultPortDescription;
+import org.onosproject.net.device.DefaultPortStatistics;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.api.client.repackaged.com.google.common.annotations.Beta;
+
+/**
+ * gRPC message conversion related utilities.
+ */
+@Beta
+public final class GrpcDeviceUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(GrpcDeviceUtils.class);
+
+ /**
+ * Translates gRPC enum MastershipRole to ONOS enum.
+ *
+ * @param role mastership role in gRPC enum
+ * @return equivalent in ONOS enum
+ */
+ public static MastershipRole translate(org.onosproject.grpc.Device.MastershipRole role) {
+ switch (role) {
+ case NONE:
+ return MastershipRole.NONE;
+ case MASTER:
+ return MastershipRole.MASTER;
+ case STANDBY:
+ return MastershipRole.STANDBY;
+ case UNRECOGNIZED:
+ log.warn("Unrecognized MastershipRole gRPC message: {}", role);
+ default:
+ return MastershipRole.NONE;
+ }
+ }
+
+ /**
+ * Translates ONOS enum MastershipRole to gRPC enum.
+ *
+ * @param newRole ONOS' mastership role
+ * @return equivalent in gRPC message enum
+ */
+ public static org.onosproject.grpc.Device.MastershipRole translate(MastershipRole newRole) {
+ switch (newRole) {
+ case MASTER:
+ return org.onosproject.grpc.Device.MastershipRole.MASTER;
+ case STANDBY:
+ return org.onosproject.grpc.Device.MastershipRole.STANDBY;
+ case NONE:
+ default:
+ return org.onosproject.grpc.Device.MastershipRole.NONE;
+ }
+ }
+
+
+ /**
+ * Translates gRPC DeviceDescription to {@link DeviceDescription}.
+ *
+ * @param deviceDescription gRPC message
+ * @return {@link DeviceDescription}
+ */
+ public static DeviceDescription translate(org.onosproject.grpc.Device.DeviceDescription deviceDescription) {
+ URI uri = URI.create(deviceDescription.getDeviceUri());
+ Device.Type type = translate(deviceDescription.getType());
+ String manufacturer = deviceDescription.getManufacturer();
+ String hwVersion = deviceDescription.getHwVersion();
+ String swVersion = deviceDescription.getSwVersion();
+ String serialNumber = deviceDescription.getSerialNumber();
+ ChassisId chassis = new ChassisId(deviceDescription.getChassisId());
+ return new DefaultDeviceDescription(uri, type, manufacturer,
+ hwVersion, swVersion, serialNumber,
+ chassis,
+ asAnnotations(deviceDescription.getAnnotations()));
+ }
+
+ /**
+ * Translates {@link DeviceDescription} to gRPC DeviceDescription message.
+ *
+ * @param deviceDescription {@link DeviceDescription}
+ * @return gRPC DeviceDescription message
+ */
+ public static org.onosproject.grpc.Device.DeviceDescription translate(DeviceDescription deviceDescription) {
+
+ return org.onosproject.grpc.Device.DeviceDescription.newBuilder()
+ .setDeviceUri(deviceDescription.deviceUri().toString())
+ .setType(translate(deviceDescription.type()))
+ .setManufacturer(deviceDescription.manufacturer())
+ .setHwVersion(deviceDescription.hwVersion())
+ .setSwVersion(deviceDescription.swVersion())
+ .setSerialNumber(deviceDescription.serialNumber())
+ .setChassisId(deviceDescription.chassisId().toString())
+ .putAllAnnotations(asMap(deviceDescription.annotations()))
+ .build();
+ }
+
+
+ /**
+ * Translates gRPC DeviceType to {@link Device.Type}.
+ *
+ * @param type gRPC message
+ * @return {@link Device.Type}
+ */
+ public static Device.Type translate(org.onosproject.grpc.Device.DeviceType type) {
+ switch (type) {
+ case BALANCER:
+ return Device.Type.BALANCER;
+ case CONTROLLER:
+ return Device.Type.CONTROLLER;
+ case FIBER_SWITCH:
+ return Device.Type.FIBER_SWITCH;
+ case FIREWALL:
+ return Device.Type.FIREWALL;
+ case IDS:
+ return Device.Type.IDS;
+ case IPS:
+ return Device.Type.IPS;
+ case MICROWAVE:
+ return Device.Type.MICROWAVE;
+ case OTHER:
+ return Device.Type.OTHER;
+ case OTN:
+ return Device.Type.OTN;
+ case ROADM:
+ return Device.Type.ROADM;
+ case ROADM_OTN:
+ return Device.Type.ROADM_OTN;
+ case ROUTER:
+ return Device.Type.ROUTER;
+ case SWITCH:
+ return Device.Type.SWITCH;
+ case VIRTUAL:
+ return Device.Type.VIRTUAL;
+
+ case UNRECOGNIZED:
+ default:
+ log.warn("Unexpected DeviceType: {}", type);
+ return Device.Type.OTHER;
+ }
+ }
+
+ /**
+ * Translates {@link Type} to gRPC DeviceType.
+ *
+ * @param type {@link Type}
+ * @return gRPC message
+ */
+ public static DeviceType translate(Device.Type type) {
+ switch (type) {
+ case BALANCER:
+ return DeviceType.BALANCER;
+ case CONTROLLER:
+ return DeviceType.CONTROLLER;
+ case FIBER_SWITCH:
+ return DeviceType.FIBER_SWITCH;
+ case FIREWALL:
+ return DeviceType.FIREWALL;
+ case IDS:
+ return DeviceType.IDS;
+ case IPS:
+ return DeviceType.IPS;
+ case MICROWAVE:
+ return DeviceType.MICROWAVE;
+ case OTHER:
+ return DeviceType.OTHER;
+ case OTN:
+ return DeviceType.OTN;
+ case ROADM:
+ return DeviceType.ROADM;
+ case ROADM_OTN:
+ return DeviceType.ROADM_OTN;
+ case ROUTER:
+ return DeviceType.ROUTER;
+ case SWITCH:
+ return DeviceType.SWITCH;
+ case VIRTUAL:
+ return DeviceType.VIRTUAL;
+
+ default:
+ log.warn("Unexpected Device.Type: {}", type);
+ return DeviceType.OTHER;
+ }
+ }
+
+ /**
+ * Translates gRPC PortDescription message to {@link PortDescription}.
+ *
+ * @param portDescription gRPC message
+ * @return {@link PortDescription}
+ */
+ public static PortDescription translate(org.onosproject.grpc.Port.PortDescription portDescription) {
+ PortNumber number = PortNumber.fromString(portDescription.getPortNumber());
+ boolean isEnabled = portDescription.getIsEnabled();
+ Port.Type type = translate(portDescription.getType());
+ long portSpeed = portDescription.getPortSpeed();
+ SparseAnnotations annotations = asAnnotations(portDescription.getAnnotations());
+ // TODO How to deal with more specific Port...
+ return new DefaultPortDescription(number, isEnabled, type, portSpeed, annotations);
+ }
+
+ /**
+ * Translates {@link PortDescription} to gRPC PortDescription message.
+ *
+ * @param portDescription {@link PortDescription}
+ * @return gRPC PortDescription message
+ */
+ public static org.onosproject.grpc.Port.PortDescription translate(PortDescription portDescription) {
+ // TODO How to deal with more specific Port...
+ return org.onosproject.grpc.Port.PortDescription.newBuilder()
+ .setPortNumber(portDescription.portNumber().toString())
+ .setIsEnabled(portDescription.isEnabled())
+ .setType(translate(portDescription.type()))
+ .setPortSpeed(portDescription.portSpeed())
+ .putAllAnnotations(asMap(portDescription.annotations()))
+ .build();
+ }
+
+ /**
+ * Translates gRPC PortType to {@link Port.Type}.
+ *
+ * @param type gRPC message
+ * @return {@link Port.Type}
+ */
+ public static Port.Type translate(PortType type) {
+ switch (type) {
+ case COPPER:
+ return Type.COPPER;
+ case FIBER:
+ return Type.FIBER;
+ case OCH:
+ return Type.OCH;
+ case ODUCLT:
+ return Type.ODUCLT;
+ case OMS:
+ return Type.OMS;
+ case PACKET:
+ return Type.PACKET;
+ case VIRTUAL:
+ return Type.VIRTUAL;
+
+ case UNRECOGNIZED:
+ default:
+ log.warn("Unexpected PortType: {}", type);
+ return Type.COPPER;
+ }
+ }
+
+ /**
+ * Translates {@link Port.Type} to gRPC PortType.
+ *
+ * @param type {@link Port.Type}
+ * @return gRPC message
+ */
+ public static PortType translate(Port.Type type) {
+ switch (type) {
+ case COPPER:
+ return PortType.COPPER;
+ case FIBER:
+ return PortType.FIBER;
+ case OCH:
+ return PortType.OCH;
+ case ODUCLT:
+ return PortType.ODUCLT;
+ case OMS:
+ return PortType.OMS;
+ case PACKET:
+ return PortType.PACKET;
+ case VIRTUAL:
+ return PortType.VIRTUAL;
+
+ default:
+ log.warn("Unexpected Port.Type: {}", type);
+ return PortType.COPPER;
+ }
+ }
+
+ /**
+ * Translates gRPC PortStatistics message to {@link PortStatistics}.
+ *
+ * @param portStatistics gRPC PortStatistics message
+ * @return {@link PortStatistics}
+ */
+ public static PortStatistics translate(org.onosproject.grpc.Port.PortStatistics portStatistics) {
+ // TODO implement adding missing fields
+ return DefaultPortStatistics.builder()
+ .setPort(portStatistics.getPort())
+ .setPacketsReceived(portStatistics.getPacketsReceived())
+ .setPacketsSent(portStatistics.getPacketsSent())
+ .build();
+ }
+
+ /**
+ * Translates {@link PortStatistics} to gRPC PortStatistics message.
+ *
+ * @param portStatistics {@link PortStatistics}
+ * @return gRPC PortStatistics message
+ */
+ public static org.onosproject.grpc.Port.PortStatistics translate(PortStatistics portStatistics) {
+ // TODO implement adding missing fields
+ return org.onosproject.grpc.Port.PortStatistics.newBuilder()
+ .setPort(portStatistics.port())
+ .setPacketsReceived(portStatistics.packetsReceived())
+ .setPacketsSent(portStatistics.packetsSent())
+ .build();
+ }
+
+ // may be this can be moved to Annotation itself or AnnotationsUtils
+ /**
+ * Converts Annotations to Map of Strings.
+ *
+ * @param annotations {@link Annotations}
+ * @return Map of annotation key and values
+ */
+ public static Map<String, String> asMap(Annotations annotations) {
+ if (annotations instanceof DefaultAnnotations) {
+ return ((DefaultAnnotations) annotations).asMap();
+ }
+ Map<String, String> map = new HashMap<>();
+ annotations.keys()
+ .forEach(k -> map.put(k, annotations.value(k)));
+
+ return map;
+ }
+
+ // may be this can be moved to Annotation itself or AnnotationsUtils
+ /**
+ * Converts Map of Strings to {@link SparseAnnotations}.
+ *
+ * @param annotations Map of annotation key and values
+ * @return {@link SparseAnnotations}
+ */
+ public static SparseAnnotations asAnnotations(Map<String, String> annotations) {
+ DefaultAnnotations.Builder builder = DefaultAnnotations.builder();
+ annotations.entrySet().forEach(e -> {
+ if (e.getValue() != null) {
+ builder.set(e.getKey(), e.getValue());
+ } else {
+ builder.remove(e.getKey());
+ }
+ });
+ return builder.build();
+ }
+
+ // Utility class not intended for instantiation.
+ private GrpcDeviceUtils() {}
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java
new file mode 100644
index 00000000..b419a346
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceContext.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.onosproject.incubator.rpc.RemoteServiceContext;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+
+import io.grpc.ManagedChannel;
+
+// gRPC Client side
+// Probably there should be plug-in mechanism in the future.
+/**
+ * RemoteServiceContext based on gRPC.
+ *
+ * <p>
+ * Currently it supports {@link DeviceProviderRegistry}.
+ */
+public class GrpcRemoteServiceContext implements RemoteServiceContext {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final Map<Class<? extends Object>, Object> services = new ConcurrentHashMap<>();
+
+ private final ManagedChannel channel;
+
+ public GrpcRemoteServiceContext(ManagedChannel channel) {
+ this.channel = checkNotNull(channel);
+ services.put(DeviceProviderRegistry.class, new DeviceProviderRegistryClientProxy(channel));
+ }
+
+
+ @Override
+ public <T> T get(Class<T> serviceClass) {
+ @SuppressWarnings("unchecked")
+ T service = (T) services.get(serviceClass);
+ if (service != null) {
+ return service;
+ }
+ log.error("{} not supported", serviceClass);
+ throw new NoSuchElementException(serviceClass.getTypeName() + " not supported");
+ }
+
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("services", services.keySet())
+ .add("channel", channel.authority())
+ .toString();
+ }
+
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java
new file mode 100644
index 00000000..74962187
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceProvider.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.incubator.rpc.RemoteServiceContext;
+import org.onosproject.incubator.rpc.RemoteServiceContextProvider;
+import org.onosproject.incubator.rpc.RemoteServiceContextProviderService;
+import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry;
+import org.onosproject.net.provider.ProviderId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
+
+// gRPC Client side
+/**
+ * RemoteServiceContextProvider based on gRPC.
+ */
+@Component(immediate = true)
+public class GrpcRemoteServiceProvider implements RemoteServiceContextProvider {
+
+ public static final String GRPC_SCHEME = "grpc";
+
+ public static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
+
+ private static final ProviderId PID = new ProviderId(GRPC_SCHEME, RPC_PROVIDER_NAME);
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected RemoteServiceProviderRegistry rpcRegistry;
+
+ private Map<URI, ManagedChannel> channels = new ConcurrentHashMap<>();
+
+ private RemoteServiceContextProviderService providerService;
+
+
+ @Activate
+ protected void activate() {
+ providerService = rpcRegistry.register(this);
+
+ // FIXME remove me. test code to see if gRPC loads in karaf
+ //getChannel(URI.create("grpc://localhost:8080"));
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ rpcRegistry.unregister(this);
+
+ // shutdown all channels
+ channels.values().stream()
+ .forEach(ManagedChannel::shutdown);
+ // Should we wait for shutdown? How?
+ channels.clear();
+ log.info("Stopped");
+ }
+
+ @Override
+ public ProviderId id() {
+ return PID;
+ }
+
+ @Override
+ public RemoteServiceContext get(URI uri) {
+ // Create gRPC client
+ return new GrpcRemoteServiceContext(getChannel(uri));
+ }
+
+ private ManagedChannel getChannel(URI uri) {
+ checkArgument(Objects.equals(GRPC_SCHEME, uri.getScheme()),
+ "Invalid URI scheme: %s", uri.getScheme());
+
+ return channels.compute(uri, (u, ch) -> {
+ if (ch != null && !ch.isShutdown()) {
+ return ch;
+ } else {
+ return createChannel(u);
+ }
+ });
+ }
+
+ private ManagedChannel createChannel(URI uri) {
+ log.debug("Creating channel for {}", uri);
+ return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort())
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .build();
+ }
+
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
new file mode 100644
index 00000000..4f43fa65
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
@@ -0,0 +1,385 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.stream.Collectors.toList;
+import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
+import static org.onosproject.net.DeviceId.deviceId;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.grpc.Device.DeviceConnected;
+import org.onosproject.grpc.Device.DeviceDisconnected;
+import org.onosproject.grpc.Device.DeviceProviderMsg;
+import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
+import org.onosproject.grpc.Device.IsReachableResponse;
+import org.onosproject.grpc.Device.PortStatusChanged;
+import org.onosproject.grpc.Device.ReceivedRoleReply;
+import org.onosproject.grpc.Device.RegisterProvider;
+import org.onosproject.grpc.Device.UpdatePortStatistics;
+import org.onosproject.grpc.Device.UpdatePorts;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.provider.ProviderId;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import io.grpc.stub.StreamObserver;
+
+// gRPC Server on Metro-side
+// Translates request received on RPC channel, and calls corresponding Service on
+// Metro-ONOS cluster.
+/**
+ * Server side implementation of gRPC based RemoteService.
+ */
+@Component(immediate = true)
+public class GrpcRemoteServiceServer {
+
+ private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
+
+ // TODO pick a number
+ public static final int DEFAULT_LISTEN_PORT = 11984;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceProviderRegistry deviceProviderRegistry;
+
+
+ @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
+ label = "Port to listen on")
+ protected int listenPort = DEFAULT_LISTEN_PORT;
+
+ private Server server;
+ private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
+
+ @Activate
+ protected void activate(ComponentContext context) throws IOException {
+ modified(context);
+
+ log.debug("Server starting on {}", listenPort);
+ try {
+ server = NettyServerBuilder.forPort(listenPort)
+ .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
+ .build().start();
+ } catch (IOException e) {
+ log.error("Failed to start gRPC server", e);
+ throw e;
+ }
+
+ log.info("Started on {}", listenPort);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+
+ registeredProviders.stream()
+ .forEach(deviceProviderRegistry::unregister);
+
+ server.shutdown();
+ // Should we wait for shutdown?
+ log.info("Stopped");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ // TODO support dynamic reconfiguration and restarting server?
+ }
+
+ // RPC Server-side code
+ // RPC session Factory
+ /**
+ * Relays DeviceProviderRegistry calls from RPC client.
+ */
+ class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
+
+ @Override
+ public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ log.trace("DeviceProviderRegistryServerProxy#register called!");
+
+ DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
+
+ return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
+ }
+ }
+
+ // Lower -> Upper Controller message
+ // RPC Server-side code
+ // RPC session handler
+ private final class DeviceProviderServiceServerProxy
+ implements StreamObserver<DeviceProviderServiceMsg> {
+
+ // intentionally shadowing
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final DeviceProviderServerProxy pairedProvider;
+ private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
+
+ private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
+
+ // wrapped providerService
+ private DeviceProviderService deviceProviderService;
+
+
+ DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
+ StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ this.pairedProvider = provider;
+ this.toDeviceProvider = toDeviceProvider;
+ outstandingIsReachable = CacheBuilder.newBuilder()
+ .expireAfterWrite(1, TimeUnit.MINUTES)
+ .build();
+
+ // pair RPC session in other direction
+ provider.pair(this);
+ }
+
+ @Override
+ public void onNext(DeviceProviderServiceMsg msg) {
+ try {
+ log.trace("DeviceProviderServiceServerProxy received: {}", msg);
+ onMethod(msg);
+ } catch (Exception e) {
+ log.error("Exception thrown handling {}", msg, e);
+ onError(e);
+ throw e;
+ }
+ }
+
+ /**
+ * Translates received RPC message to {@link DeviceProviderService} method calls.
+ * @param msg DeviceProviderService message
+ */
+ private void onMethod(DeviceProviderServiceMsg msg) {
+ switch (msg.getMethodCase()) {
+ case REGISTER_PROVIDER:
+ RegisterProvider registerProvider = msg.getRegisterProvider();
+ // TODO Do we care about provider name?
+ pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
+ registeredProviders.add(pairedProvider);
+ deviceProviderService = deviceProviderRegistry.register(pairedProvider);
+ break;
+
+ case DEVICE_CONNECTED:
+ DeviceConnected deviceConnected = msg.getDeviceConnected();
+ deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
+ translate(deviceConnected.getDeviceDescription()));
+ break;
+ case DEVICE_DISCONNECTED:
+ DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
+ deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
+ break;
+ case UPDATE_PORTS:
+ UpdatePorts updatePorts = msg.getUpdatePorts();
+ deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
+ updatePorts.getPortDescriptionsList()
+ .stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList()));
+ break;
+ case PORT_STATUS_CHANGED:
+ PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
+ deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
+ translate(portStatusChanged.getPortDescription()));
+ break;
+ case RECEIVED_ROLE_REPLY:
+ ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
+ deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
+ translate(receivedRoleReply.getRequested()),
+ translate(receivedRoleReply.getResponse()));
+ break;
+ case UPDATE_PORT_STATISTICS:
+ UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
+ deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
+ updatePortStatistics.getPortStatisticsList()
+ .stream()
+ .map(GrpcDeviceUtils::translate)
+ .collect(toList()));
+ break;
+
+ // return value of DeviceProvider#isReachable
+ case IS_REACHABLE_RESPONSE:
+ IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
+ int xid = isReachableResponse.getXid();
+ boolean isReachable = isReachableResponse.getIsReachable();
+ CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
+ if (result != null) {
+ result.complete(isReachable);
+ }
+ break;
+
+ case METHOD_NOT_SET:
+ default:
+ log.warn("Unexpected message received {}", msg);
+ break;
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("DeviceProviderServiceServerProxy completed");
+ deviceProviderRegistry.unregister(pairedProvider);
+ registeredProviders.remove(pairedProvider);
+ toDeviceProvider.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ log.error("DeviceProviderServiceServerProxy#onError", e);
+ deviceProviderRegistry.unregister(pairedProvider);
+ registeredProviders.remove(pairedProvider);
+ // TODO What is the proper clean up for bi-di stream on error?
+ // sample suggests no-op
+ toDeviceProvider.onError(e);
+ }
+
+
+ /**
+ * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
+ * @param xid IsReachable call ID.
+ * @param reply Future to
+ */
+ void register(int xid, CompletableFuture<Boolean> reply) {
+ outstandingIsReachable.put(xid, reply);
+ }
+
+ }
+
+ // Upper -> Lower Controller message
+ /**
+ * Relay DeviceProvider calls to RPC client.
+ */
+ private final class DeviceProviderServerProxy
+ implements DeviceProvider {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ // xid for isReachable calls
+ private final AtomicInteger xidPool = new AtomicInteger();
+ private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
+
+ private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
+ private ProviderId providerId;
+
+ DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+ this.toDeviceProvider = toDeviceProvider;
+ }
+
+ void setProviderId(ProviderId pid) {
+ this.providerId = pid;
+ }
+
+ /**
+ * Registers RPC stream in other direction.
+ * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
+ */
+ void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
+ this.deviceProviderServiceProxy = deviceProviderServiceProxy;
+ }
+
+ @Override
+ public void triggerProbe(DeviceId deviceId) {
+ log.trace("triggerProbe({})", deviceId);
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
+ .setDeviceId(deviceId.toString())
+ .build());
+ DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
+ toDeviceProvider.onNext(triggerProbeMsg);
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+ log.trace("roleChanged({}, {})", deviceId, newRole);
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
+ .setDeviceId(deviceId.toString())
+ .setNewRole(translate(newRole))
+ .build());
+ toDeviceProvider.onNext(msgBuilder.build());
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public boolean isReachable(DeviceId deviceId) {
+ log.trace("isReachable({})", deviceId);
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ final int xid = xidPool.incrementAndGet();
+
+ DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+ msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
+ .setXid(xid)
+ .setDeviceId(deviceId.toString())
+ .build());
+
+ // Associate xid and register above future some where
+ // in DeviceProviderService channel to receive reply
+ if (deviceProviderServiceProxy != null) {
+ deviceProviderServiceProxy.register(xid, result);
+ }
+
+ // send message down RPC
+ toDeviceProvider.onNext(msgBuilder.build());
+
+ // wait for reply
+ try {
+ return result.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.debug("isReachable({}) was Interrupted", deviceId, e);
+ Thread.currentThread().interrupt();
+ } catch (TimeoutException e) {
+ log.warn("isReachable({}) Timed out", deviceId, e);
+ } catch (ExecutionException e) {
+ log.error("isReachable({}) Execution failed", deviceId, e);
+ // close session?
+ }
+ return false;
+ // TODO Catch Exceptions and call onError()
+ }
+
+ @Override
+ public ProviderId id() {
+ return checkNotNull(providerId, "not initialized yet");
+ }
+
+ }
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java
new file mode 100644
index 00000000..d667ea77
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC based RemoteServiceProvider implementation.
+ */
+package org.onosproject.incubator.rpc.grpc;
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/proto/Device.proto b/framework/src/onos/incubator/rpc-grpc/src/main/proto/Device.proto
new file mode 100644
index 00000000..aae46d96
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/proto/Device.proto
@@ -0,0 +1,131 @@
+syntax = "proto3";
+option java_package = "org.onosproject.grpc";
+
+import "Port.proto";
+package Device;
+
+enum DeviceType {
+ OTHER = 0;
+ SWITCH = 1;
+ ROUTER = 2;
+ ROADM = 3;
+ OTN = 4;
+ ROADM_OTN = 5;
+ FIREWALL = 6;
+ BALANCER = 7;
+ IPS = 8;
+ IDS = 9;
+ CONTROLLER = 10;
+ VIRTUAL = 11;
+ FIBER_SWITCH = 12;
+ MICROWAVE = 13;
+}
+
+message DeviceDescription {
+ string device_Uri = 1;
+ DeviceType type = 2;
+ string manufacturer = 3;
+ string hw_version = 4;
+ string sw_version = 5;
+ string serial_number = 6;
+ string chassis_id = 7;
+ map<string, string> annotations = 8;
+}
+
+enum MastershipRole {
+ NONE = 0;
+ MASTER = 1;
+ STANDBY = 2;
+}
+
+message DeviceConnected {
+ // DeviceID as String DeviceId#toString
+ string device_id = 1;
+ DeviceDescription device_description = 2;
+}
+
+message DeviceDisconnected {
+ // DeviceID as String DeviceId#toString
+ string device_id = 1;
+}
+
+message UpdatePorts {
+ // DeviceID as String DeviceId#toString
+ string device_id = 1;
+ repeated Port.PortDescription port_descriptions= 2;
+}
+
+message PortStatusChanged {
+ // DeviceID as String DeviceId#toString
+ string device_id = 1;
+ Port.PortDescription port_description= 2;
+}
+
+message ReceivedRoleReply {
+ // DeviceID as String DeviceId#toString
+ string device_id = 1;
+ MastershipRole requested = 2;
+ MastershipRole response = 3;
+}
+
+message UpdatePortStatistics {
+ // DeviceID as String DeviceId#toString
+ string device_id = 1;
+ repeated Port.PortStatistics port_statistics = 2;
+}
+
+message RegisterProvider {
+ // DeviceProvider's ProviderId scheme
+ string provider_scheme = 1;
+}
+
+message DeviceProviderServiceMsg {
+ oneof method {
+ DeviceConnected device_connected= 1;
+ DeviceDisconnected device_disconnected = 2;
+ UpdatePorts update_ports= 3;
+ PortStatusChanged port_status_changed = 4;
+ ReceivedRoleReply received_role_reply = 5;
+ UpdatePortStatistics update_port_statistics = 6;
+
+ // This message is for return value of DeviceProvider#isReachable
+ IsReachableResponse is_reachable_response = 7;
+
+ // This MUST be the 1st message over the stream
+ RegisterProvider register_provider = 8;
+ }
+}
+
+message TriggerProbe {
+ // DeviceID as String DeviceId#toString
+ string device_id = 1;
+}
+
+message RoleChanged {
+ // DeviceID as String DeviceId#toString
+ string device_id = 1;
+ MastershipRole new_role = 2;
+}
+
+message IsReachableRequest {
+ int32 xid = 1;
+ // DeviceID as String DeviceId#toString
+ string device_id = 2;
+}
+
+message IsReachableResponse {
+ int32 xid = 1;
+ bool is_reachable = 2;
+}
+
+message DeviceProviderMsg {
+ oneof method {
+ TriggerProbe trigger_probe = 1;
+ RoleChanged role_changed = 2;
+ IsReachableRequest is_reachable_request= 3;
+ }
+}
+
+service DeviceProviderRegistryRpc {
+ rpc Register(stream DeviceProviderServiceMsg) returns (stream DeviceProviderMsg);
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/main/proto/Port.proto b/framework/src/onos/incubator/rpc-grpc/src/main/proto/Port.proto
new file mode 100644
index 00000000..f32193cc
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/main/proto/Port.proto
@@ -0,0 +1,40 @@
+syntax = "proto3";
+option java_package = "org.onosproject.grpc";
+
+package Port;
+
+enum PortType {
+ // Signifies copper-based connectivity.
+ COPPER = 0;
+ // Signifies optical fiber-based connectivity.
+ FIBER = 1;
+ // Signifies optical fiber-based packet port.
+ PACKET = 2;
+ // Signifies optical fiber-based optical tributary port (called T-port).
+ //The signal from the client side will be formed into a ITU G.709 (OTN) frame.
+ ODUCLT = 3;
+ // Signifies optical fiber-based Line-side port (called L-port).
+ OCH = 4;
+ // Signifies optical fiber-based WDM port (called W-port).
+ //Optical Multiplexing Section (See ITU G.709).
+ OMS = 5;
+ // Signifies virtual port.
+ VIRTUAL = 6;
+}
+
+// TODO What are we going to do with more specific PortDescription ...
+message PortDescription {
+ // PortNumber as String PortNumber#toString
+ string port_number = 1;
+ bool is_enabled = 2;
+ PortType type = 3;
+ int64 port_speed = 4;
+ map<string, string> annotations = 8;
+}
+
+message PortStatistics {
+ int32 port = 1;
+ int64 packets_received = 2;
+ int64 packets_sent = 3;
+ // TODO add all other fields
+}
diff --git a/framework/src/onos/incubator/rpc-grpc/src/test/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceTest.java b/framework/src/onos/incubator/rpc-grpc/src/test/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceTest.java
new file mode 100644
index 00000000..69db5714
--- /dev/null
+++ b/framework/src/onos/incubator/rpc-grpc/src/test/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceTest.java
@@ -0,0 +1,398 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static org.junit.Assert.*;
+import static org.onosproject.net.DeviceId.deviceId;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.packet.ChassisId;
+import org.onosproject.incubator.rpc.RemoteServiceContext;
+import org.onosproject.incubator.rpc.RemoteServiceContextProvider;
+import org.onosproject.incubator.rpc.RemoteServiceContextProviderService;
+import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device.Type;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DefaultPortDescription;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.provider.AbstractProviderRegistry;
+import org.onosproject.net.provider.AbstractProviderService;
+import org.onosproject.net.provider.ProviderId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Set of tests of the gRPC RemoteService components.
+ */
+public class GrpcRemoteServiceTest {
+
+ private static final DeviceId DEVICE_ID = deviceId("dev:000001");
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final ProviderId PID = new ProviderId("test", "com.exmaple.test");
+
+ private static final URI DURI = URI.create("dev:000001");
+
+ private static final String MFR = "mfr";
+
+ private static final String HW = "hw";
+
+ private static final String SW = "sw";
+
+ private static final String SN = "serial";
+
+ private static final ChassisId CHASSIS = new ChassisId(42);
+
+ private static final SparseAnnotations ANON = DefaultAnnotations.builder()
+ .set("foo", "var")
+ .build();
+
+ private static final PortNumber PORT = PortNumber.portNumber(99);
+
+ private static final DeviceDescription DDESC
+ = new DefaultDeviceDescription(DURI, Type.SWITCH, MFR, HW, SW, SN,
+ CHASSIS, ANON);
+
+ private GrpcRemoteServiceServer server;
+ private GrpcRemoteServiceProvider client;
+
+ private DeviceProvider svSideDeviceProvider;
+
+ private MTestDeviceProviderService svDeviceProviderService;
+
+ private CountDownLatch serverReady;
+
+ private URI uri;
+
+ public static int pickListenPort() {
+ try {
+ // pick unused port
+ ServerSocket socket = new ServerSocket(0);
+ int port = socket.getLocalPort();
+ socket.close();
+ return port;
+ } catch (IOException e) {
+ // something went wrong, try picking randomly
+ return RandomUtils.nextInt(49152, 0xFFFF + 1);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ serverReady = new CountDownLatch(1);
+ server = new GrpcRemoteServiceServer();
+ server.deviceProviderRegistry = new MTestDeviceProviderRegistry();
+ // todo: pass proper ComponentContext
+ server.listenPort = pickListenPort();
+ uri = URI.create("grpc://localhost:" + server.listenPort);
+ server.activate(null);
+
+ client = new GrpcRemoteServiceProvider();
+ client.rpcRegistry = new NoOpRemoteServiceProviderRegistry();
+ client.activate();
+ }
+
+ @After
+ public void tearDown() {
+ client.deactivate();
+ server.deactivate();
+ }
+
+ private static void assertEqualsButNotSame(Object expected, Object actual) {
+ assertEquals(expected, actual);
+ assertNotSame("Cannot be same instance if it properly went through gRPC",
+ expected, actual);
+ }
+
+ @Test
+ public void basics() throws InterruptedException {
+ RemoteServiceContext remoteServiceContext = client.get(uri);
+ assertNotNull(remoteServiceContext);
+
+ DeviceProviderRegistry deviceProviderRegistry = remoteServiceContext.get(DeviceProviderRegistry.class);
+ assertNotNull(deviceProviderRegistry);
+
+ CTestDeviceProvider clDeviceProvider = new CTestDeviceProvider();
+ DeviceProviderService clDeviceProviderService = deviceProviderRegistry.register(clDeviceProvider);
+
+ assertTrue(serverReady.await(10, TimeUnit.SECONDS));
+
+ // client to server communication
+ clDeviceProviderService.deviceConnected(DEVICE_ID, DDESC);
+ assertTrue(svDeviceProviderService.deviceConnected.await(10, TimeUnit.SECONDS));
+ assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.deviceConnectedDid);
+ assertEqualsButNotSame(DDESC, svDeviceProviderService.deviceConnectedDesc);
+
+ PortDescription portDescription = new DefaultPortDescription(PORT, true, ANON);
+ List<PortDescription> portDescriptions = ImmutableList.of(portDescription);
+ clDeviceProviderService.updatePorts(DEVICE_ID, portDescriptions);
+ assertTrue(svDeviceProviderService.updatePorts.await(10, TimeUnit.SECONDS));
+ assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.updatePortsDid);
+ assertEqualsButNotSame(portDescriptions, svDeviceProviderService.updatePortsDescs);
+
+ MastershipRole cRole = MastershipRole.MASTER;
+ MastershipRole dRole = MastershipRole.STANDBY;
+ clDeviceProviderService.receivedRoleReply(DEVICE_ID, cRole, dRole);
+ assertTrue(svDeviceProviderService.receivedRoleReply.await(10, TimeUnit.SECONDS));
+ assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.receivedRoleReplyDid);
+ assertEquals(cRole, svDeviceProviderService.receivedRoleReplyRequested);
+ assertEquals(dRole, svDeviceProviderService.receivedRoleReplyResponse);
+
+ clDeviceProviderService.portStatusChanged(DEVICE_ID, portDescription);
+ assertTrue(svDeviceProviderService.portStatusChanged.await(10, TimeUnit.SECONDS));
+ assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.portStatusChangedDid);
+ assertEqualsButNotSame(portDescription, svDeviceProviderService.portStatusChangedDesc);
+
+ Collection<PortStatistics> portStatistics = Collections.emptyList();
+ clDeviceProviderService.updatePortStatistics(DEVICE_ID, portStatistics);
+ assertTrue(svDeviceProviderService.updatePortStatistics.await(10, TimeUnit.SECONDS));
+ assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.updatePortStatisticsDid);
+ assertEqualsButNotSame(portStatistics, svDeviceProviderService.updatePortStatisticsStats);
+
+ clDeviceProviderService.deviceDisconnected(DEVICE_ID);
+ assertTrue(svDeviceProviderService.deviceDisconnected.await(10, TimeUnit.SECONDS));
+ assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.deviceDisconnectedDid);
+
+
+
+ // server to client communication
+ svSideDeviceProvider.triggerProbe(DEVICE_ID);
+ assertTrue(clDeviceProvider.triggerProbe.await(10, TimeUnit.SECONDS));
+ assertEquals(DEVICE_ID, clDeviceProvider.triggerProbeDid);
+ assertNotSame("Cannot be same instance if it properly went through gRPC",
+ DEVICE_ID, clDeviceProvider.triggerProbeDid);
+
+ svSideDeviceProvider.roleChanged(DEVICE_ID, MastershipRole.STANDBY);
+ assertTrue(clDeviceProvider.roleChanged.await(10, TimeUnit.SECONDS));
+ assertEquals(DEVICE_ID, clDeviceProvider.roleChangedDid);
+ assertNotSame("Cannot be same instance if it properly went through gRPC",
+ DEVICE_ID, clDeviceProvider.roleChangedDid);
+ assertEquals(MastershipRole.STANDBY, clDeviceProvider.roleChangedNewRole);
+
+ clDeviceProvider.isReachableReply = false;
+ assertEquals(clDeviceProvider.isReachableReply,
+ svSideDeviceProvider.isReachable(DEVICE_ID));
+ assertTrue(clDeviceProvider.isReachable.await(10, TimeUnit.SECONDS));
+ assertEquals(DEVICE_ID, clDeviceProvider.isReachableDid);
+ assertNotSame("Cannot be same instance if it properly went through gRPC",
+ DEVICE_ID, clDeviceProvider.isReachableDid);
+ }
+
+ /**
+ * Device Provider on CO side.
+ */
+ public class CTestDeviceProvider implements DeviceProvider {
+
+ final CountDownLatch triggerProbe = new CountDownLatch(1);
+ DeviceId triggerProbeDid;
+
+ final CountDownLatch roleChanged = new CountDownLatch(1);
+ DeviceId roleChangedDid;
+ MastershipRole roleChangedNewRole;
+
+ final CountDownLatch isReachable = new CountDownLatch(1);
+ DeviceId isReachableDid;
+ boolean isReachableReply = false;
+
+ @Override
+ public ProviderId id() {
+ return PID;
+ }
+
+ @Override
+ public void triggerProbe(DeviceId deviceId) {
+ log.info("triggerProbe({}) on Client called", deviceId);
+ triggerProbeDid = deviceId;
+ triggerProbe.countDown();
+ }
+
+ @Override
+ public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+ log.info("roleChanged({},{}) on Client called", deviceId, newRole);
+ roleChangedDid = deviceId;
+ roleChangedNewRole = newRole;
+ roleChanged.countDown();
+ }
+
+ @Override
+ public boolean isReachable(DeviceId deviceId) {
+ log.info("isReachable({}) on Client called", deviceId);
+ isReachableDid = deviceId;
+ isReachable.countDown();
+ return isReachableReply;
+ }
+
+ }
+
+ class NoOpRemoteServiceProviderRegistry
+ implements RemoteServiceProviderRegistry {
+
+ @Override
+ public RemoteServiceContextProviderService register(RemoteServiceContextProvider provider) {
+ return new RemoteServiceContextProviderService() {
+
+ @Override
+ public RemoteServiceContextProvider provider() {
+ return provider;
+ }
+ };
+ }
+
+ @Override
+ public void unregister(RemoteServiceContextProvider provider) {
+ }
+
+ @Override
+ public Set<ProviderId> getProviders() {
+ return Collections.emptySet();
+ }
+ }
+
+ /**
+ * DeviceProvider on Metro side.
+ */
+ public class MTestDeviceProviderRegistry
+ extends AbstractProviderRegistry<DeviceProvider, DeviceProviderService>
+ implements DeviceProviderRegistry {
+
+ @Override
+ protected DeviceProviderService createProviderService(DeviceProvider provider) {
+ log.info("createProviderService({})", provider);
+ svSideDeviceProvider = provider;
+ svDeviceProviderService = new MTestDeviceProviderService(provider);
+ serverReady.countDown();
+ return svDeviceProviderService;
+ }
+
+ }
+
+ private final class MTestDeviceProviderService
+ extends AbstractProviderService<DeviceProvider>
+ implements DeviceProviderService {
+
+ public MTestDeviceProviderService(DeviceProvider provider) {
+ super(provider);
+ }
+
+
+ final CountDownLatch deviceConnected = new CountDownLatch(1);
+ DeviceId deviceConnectedDid;
+ DeviceDescription deviceConnectedDesc;
+
+ @Override
+ public void deviceConnected(DeviceId deviceId,
+ DeviceDescription deviceDescription) {
+ log.info("deviceConnected({}, {}) on Server called", deviceId, deviceDescription);
+ deviceConnectedDid = deviceId;
+ deviceConnectedDesc = deviceDescription;
+ deviceConnected.countDown();
+ }
+
+
+ final CountDownLatch updatePorts = new CountDownLatch(1);
+ DeviceId updatePortsDid;
+ List<PortDescription> updatePortsDescs;
+
+ @Override
+ public void updatePorts(DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
+ log.info("updatePorts({}, {}) on Server called", deviceId, portDescriptions);
+ updatePortsDid = deviceId;
+ updatePortsDescs = portDescriptions;
+ updatePorts.countDown();
+ }
+
+ final CountDownLatch receivedRoleReply = new CountDownLatch(1);
+ DeviceId receivedRoleReplyDid;
+ MastershipRole receivedRoleReplyRequested;
+ MastershipRole receivedRoleReplyResponse;
+
+ @Override
+ public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
+ MastershipRole response) {
+ log.info("receivedRoleReply({}, {}, {}) on Server called", deviceId, requested, response);
+ receivedRoleReplyDid = deviceId;
+ receivedRoleReplyRequested = requested;
+ receivedRoleReplyResponse = response;
+ receivedRoleReply.countDown();
+ }
+
+ final CountDownLatch portStatusChanged = new CountDownLatch(1);
+ DeviceId portStatusChangedDid;
+ PortDescription portStatusChangedDesc;
+
+
+ @Override
+ public void portStatusChanged(DeviceId deviceId,
+ PortDescription portDescription) {
+ log.info("portStatusChanged({}, {}) on Server called", deviceId, portDescription);
+ portStatusChangedDid = deviceId;
+ portStatusChangedDesc = portDescription;
+ portStatusChanged.countDown();
+ }
+
+ final CountDownLatch updatePortStatistics = new CountDownLatch(1);
+ DeviceId updatePortStatisticsDid;
+ Collection<PortStatistics> updatePortStatisticsStats;
+
+
+ @Override
+ public void updatePortStatistics(DeviceId deviceId,
+ Collection<PortStatistics> portStatistics) {
+ log.info("updatePortStatistics({}, {}) on Server called", deviceId, portStatistics);
+ updatePortStatisticsDid = deviceId;
+ updatePortStatisticsStats = portStatistics;
+ updatePortStatistics.countDown();
+ }
+
+ final CountDownLatch deviceDisconnected = new CountDownLatch(1);
+ DeviceId deviceDisconnectedDid;
+
+ @Override
+ public void deviceDisconnected(DeviceId deviceId) {
+ log.info("deviceDisconnected({}) on Server called", deviceId);
+ deviceDisconnectedDid = deviceId;
+ deviceDisconnected.countDown();
+ }
+ }
+
+}