aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java261
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java55
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java89
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/package-info.java20
4 files changed, 0 insertions, 425 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
deleted file mode 100644
index 8a237ef0..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-@Component(immediate = true)
-@Service
-public class ClusterCommunicationManager
- implements ClusterCommunicationService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MessagingService messagingService;
-
- private NodeId localNodeId;
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public <M> void broadcast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> void broadcastIncludeSelf(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> CompletableFuture<Void> unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId) {
- try {
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- encoder.apply(message)).getBytes();
- return doUnicast(subject, payload, toNodeId);
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- @Override
- public <M> void multicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Set<NodeId> nodes) {
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- encoder.apply(message)).getBytes();
- nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
- }
-
- @Override
- public <M, R> CompletableFuture<R> sendAndReceive(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Function<byte[], R> decoder,
- NodeId toNodeId) {
- try {
- ClusterMessage envelope = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- encoder.apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- return messagingService.sendAsync(nodeEp, subject.value(), payload);
- }
-
- private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
- }
-
- @Override
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber,
- ExecutorService executor) {
- messagingService.registerHandler(subject.value(),
- new InternalClusterMessageHandler(subscriber),
- executor);
- }
-
- @Override
- public void removeSubscriber(MessageSubject subject) {
- messagingService.unregisterHandler(subject.value());
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, R> handler,
- Function<R, byte[]> encoder,
- Executor executor) {
- messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<M, R>(decoder, encoder, m -> {
- CompletableFuture<R> responseFuture = new CompletableFuture<>();
- executor.execute(() -> {
- try {
- responseFuture.complete(handler.apply(m));
- } catch (Exception e) {
- responseFuture.completeExceptionally(e);
- }
- });
- return responseFuture;
- }));
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, CompletableFuture<R>> handler,
- Function<R, byte[]> encoder) {
- messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<>(decoder, encoder, handler));
- }
-
- @Override
- public <M> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Consumer<M> handler,
- Executor executor) {
- messagingService.registerHandler(subject.value(),
- new InternalMessageConsumer<>(decoder, handler),
- executor);
- }
-
- private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
- private ClusterMessageHandler handler;
-
- public InternalClusterMessageHandler(ClusterMessageHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public byte[] apply(byte[] bytes) {
- ClusterMessage message = ClusterMessage.fromBytes(bytes);
- handler.handle(message);
- return message.response();
- }
- }
-
- private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
- private final Function<byte[], M> decoder;
- private final Function<R, byte[]> encoder;
- private final Function<M, CompletableFuture<R>> handler;
-
- public InternalMessageResponder(Function<byte[], M> decoder,
- Function<R, byte[]> encoder,
- Function<M, CompletableFuture<R>> handler) {
- this.decoder = decoder;
- this.encoder = encoder;
- this.handler = handler;
- }
-
- @Override
- public CompletableFuture<byte[]> apply(byte[] bytes) {
- return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
- }
- }
-
- private class InternalMessageConsumer<M> implements Consumer<byte[]> {
- private final Function<byte[], M> decoder;
- private final Consumer<M> consumer;
-
- public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
- this.decoder = decoder;
- this.consumer = consumer;
- }
-
- @Override
- public void accept(byte[] bytes) {
- consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java
deleted file mode 100644
index ddb45f71..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.nio.service.IOLoopMessaging;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * IOLoop based MessagingService.
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class IOLoopMessagingManager extends IOLoopMessaging {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Activate
- public void activate() throws Exception {
- ControllerNode localNode = clusterMetadataService.getLocalNode();
- super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() throws Exception {
- super.stop();
- log.info("Stopped");
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
deleted file mode 100644
index ca6f9c1c..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.base.Strings;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.netty.NettyMessaging;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Netty based MessagingService.
- */
-@Component(immediate = true, enabled = true)
-@Service
-public class NettyMessagingManager extends NettyMessaging {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final short MIN_KS_LENGTH = 6;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Activate
- public void activate() throws Exception {
- ControllerNode localNode = clusterMetadataService.getLocalNode();
- getTlsParameters();
- super.start(clusterMetadataService.getClusterMetadata().getName().hashCode(),
- new Endpoint(localNode.ip(), localNode.tcpPort()));
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() throws Exception {
- super.stop();
- log.info("Stopped");
- }
-
- private void getTlsParameters() {
- String tempString = System.getProperty("enableNettyTLS");
- enableNettyTls = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString);
- log.info("enableNettyTLS = {}", enableNettyTls);
- if (enableNettyTls) {
- ksLocation = System.getProperty("javax.net.ssl.keyStore");
- if (Strings.isNullOrEmpty(ksLocation)) {
- enableNettyTls = TLS_DISABLED;
- return;
- }
- tsLocation = System.getProperty("javax.net.ssl.trustStore");
- if (Strings.isNullOrEmpty(tsLocation)) {
- enableNettyTls = TLS_DISABLED;
- return;
- }
- ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
- if (MIN_KS_LENGTH > ksPwd.length) {
- enableNettyTls = TLS_DISABLED;
- return;
- }
- tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
- if (MIN_KS_LENGTH > tsPwd.length) {
- enableNettyTls = TLS_DISABLED;
- return;
- }
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/package-info.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/package-info.java
deleted file mode 100644
index 7157277e..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of the cluster messaging mechanism.
- */
-package org.onosproject.store.cluster.messaging.impl;