aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java')
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java334
1 files changed, 0 insertions, 334 deletions
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
deleted file mode 100644
index c195d160..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio.service;
-
-import static org.onlab.util.Tools.groupedThreads;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onlab.nio.AcceptorLoop;
-import org.onlab.nio.SelectorLoop;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
-
-/**
- * MessagingService implementation based on IOLoop.
- */
-public class IOLoopMessaging implements MessagingService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
-
- static final long TIMEOUT = 1000;
-
- static final boolean SO_NO_DELAY = false;
- static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
- static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
-
- private static final int NUM_WORKERS = 8;
-
- private AcceptorLoop acceptorLoop;
- private final ExecutorService acceptorThreadPool =
- Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
- private final ExecutorService ioThreadPool =
- Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
-
- private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
-
- private int lastWorker = -1;
-
- private final AtomicBoolean started = new AtomicBoolean(false);
- private Endpoint localEp;
-
- private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
- new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
-
- private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
- private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
- .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
- @Override
- public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
- if (entry.wasEvicted()) {
- entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
- }
- }
- })
- .build();
-
- /**
- * Activates IO Loops.
- *
- * @param localEp local end-point
- * @throws IOException is activation fails
- */
- public void start(Endpoint localEp) throws IOException {
- if (started.get()) {
- log.warn("IOMessaging is already running at {}", localEp);
- return;
- }
- this.localEp = localEp;
- streams.setLifo(false);
- this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
-
- for (int i = 0; i < NUM_WORKERS; i++) {
- ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
- }
-
- ioLoops.forEach(ioThreadPool::execute);
- acceptorThreadPool.execute(acceptorLoop);
- ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
- acceptorLoop.awaitStart(TIMEOUT);
- started.set(true);
- }
-
- /**
- * Shuts down IO loops.
- */
- public void stop() {
- if (started.get()) {
- ioLoops.forEach(SelectorLoop::shutdown);
- acceptorLoop.shutdown();
- ioThreadPool.shutdown();
- acceptorThreadPool.shutdown();
- started.set(false);
- }
- }
-
-
- @Override
- public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
- DefaultMessage message = new DefaultMessage(
- messageIdGenerator.incrementAndGet(),
- localEp,
- type,
- payload);
- return sendAsync(ep, message);
- }
-
- protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- if (ep.equals(localEp)) {
- dispatchLocally(message);
- future.complete(null);
- return future;
- }
-
- DefaultMessageStream stream = null;
- try {
- stream = streams.borrowObject(ep);
- stream.write(message);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- } finally {
- try {
- streams.returnObject(ep, stream);
- } catch (Exception e) {
- log.warn("Failed to return stream to pool");
- }
- }
- return future;
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(
- Endpoint ep,
- String type,
- byte[] payload) {
- CompletableFuture<byte[]> response = new CompletableFuture<>();
- Long messageId = messageIdGenerator.incrementAndGet();
- responseFutures.put(messageId, response);
- DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
- try {
- sendAsync(ep, message);
- } catch (Exception e) {
- responseFutures.invalidate(messageId);
- response.completeExceptionally(e);
- }
- return response;
- }
-
- @Override
- public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
- handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
- }
-
- @Override
- public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
- handlers.put(type, message -> executor.execute(() -> {
- byte[] responsePayload = handler.apply(message.payload());
- if (responsePayload != null) {
- DefaultMessage response = new DefaultMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- responsePayload);
- sendAsync(message.sender(), response).whenComplete((result, error) -> {
- log.debug("Failed to respond", error);
- });
- }
- }));
- }
-
- @Override
- public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
- handlers.put(type, message -> {
- handler.apply(message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- DefaultMessage response = new DefaultMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- result);
- sendAsync(message.sender(), response).whenComplete((r, e) -> {
- if (e != null) {
- log.debug("Failed to respond", e);
- }
- });
- }
- });
- });
- }
-
- @Override
- public void unregisterHandler(String type) {
- handlers.remove(type);
- }
-
- protected void dispatchLocally(DefaultMessage message) {
- String type = message.type();
- if (REPLY_MESSAGE_TYPE.equals(type)) {
- try {
- CompletableFuture<byte[]> futureResponse =
- responseFutures.getIfPresent(message.id());
- if (futureResponse != null) {
- futureResponse.complete(message.payload());
- } else {
- log.warn("Received a reply for message id:[{}]. "
- + " from {}. But was unable to locate the"
- + " request handle", message.id(), message.sender());
- }
- } finally {
- responseFutures.invalidate(message.id());
- }
- return;
- }
- Consumer<DefaultMessage> handler = handlers.get(type);
- if (handler != null) {
- handler.accept(message);
- } else {
- log.debug("No handler registered for {}", type);
- }
- }
-
- // Get the next worker to which a client should be assigned
- private synchronized DefaultIOLoop nextWorker() {
- lastWorker = (lastWorker + 1) % NUM_WORKERS;
- return ioLoops.get(lastWorker);
- }
-
- /**
- * Initiates open connection request and registers the pending socket
- * channel with the given IO loop.
- *
- * @param loop loop with which the channel should be registered
- * @throws java.io.IOException if the socket could not be open or connected
- */
- private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
- SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
- SocketChannel ch = SocketChannel.open();
- ch.configureBlocking(false);
- DefaultMessageStream stream = loop.connectStream(ch);
- ch.connect(sa);
- return stream;
- }
-
- // Loop for accepting client connections
- private class DefaultAcceptorLoop extends AcceptorLoop {
-
- public DefaultAcceptorLoop(SocketAddress address) throws IOException {
- super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
- }
-
- @Override
- protected void acceptConnection(ServerSocketChannel channel) throws IOException {
- SocketChannel sc = channel.accept();
- sc.configureBlocking(false);
-
- Socket so = sc.socket();
- so.setTcpNoDelay(SO_NO_DELAY);
- so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
- so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
-
- nextWorker().acceptStream(sc);
- }
- }
-
- private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
-
- @Override
- public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
- }
-
- @Override
- public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
- stream.close();
- }
-
- @Override
- public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
- DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
- log.info("Established a new connection to {}", ep);
- return stream;
- }
-
- @Override
- public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
- }
-
- @Override
- public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
- return stream.isClosed();
- }
- }
-}