diff options
author | Ashlee Young <ashlee@onosfw.com> | 2015-09-09 22:15:21 -0700 |
---|---|---|
committer | Ashlee Young <ashlee@onosfw.com> | 2015-09-09 22:15:21 -0700 |
commit | 13d05bc8458758ee39cb829098241e89616717ee (patch) | |
tree | 22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java | |
parent | 6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff) |
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java')
-rw-r--r-- | framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java | 334 |
1 files changed, 334 insertions, 0 deletions
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java new file mode 100644 index 00000000..c195d160 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java @@ -0,0 +1,334 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio.service; + +import static org.onlab.util.Tools.groupedThreads; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.apache.commons.pool.KeyedPoolableObjectFactory; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.onlab.nio.AcceptorLoop; +import org.onlab.nio.SelectorLoop; +import org.onosproject.store.cluster.messaging.Endpoint; +import org.onosproject.store.cluster.messaging.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Lists; + +/** + * MessagingService implementation based on IOLoop. + */ +public class IOLoopMessaging implements MessagingService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY"; + + static final long TIMEOUT = 1000; + + static final boolean SO_NO_DELAY = false; + static final int SO_SEND_BUFFER_SIZE = 128 * 1024; + static final int SO_RCV_BUFFER_SIZE = 128 * 1024; + + private static final int NUM_WORKERS = 8; + + private AcceptorLoop acceptorLoop; + private final ExecutorService acceptorThreadPool = + Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor")); + private final ExecutorService ioThreadPool = + Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d")); + + private final List<DefaultIOLoop> ioLoops = Lists.newArrayList(); + + private int lastWorker = -1; + + private final AtomicBoolean started = new AtomicBoolean(false); + private Endpoint localEp; + + private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams = + new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory()); + + private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>(); + private final AtomicLong messageIdGenerator = new AtomicLong(0); + private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder() + .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() { + @Override + public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) { + if (entry.wasEvicted()) { + entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply")); + } + } + }) + .build(); + + /** + * Activates IO Loops. + * + * @param localEp local end-point + * @throws IOException is activation fails + */ + public void start(Endpoint localEp) throws IOException { + if (started.get()) { + log.warn("IOMessaging is already running at {}", localEp); + return; + } + this.localEp = localEp; + streams.setLifo(false); + this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port())); + + for (int i = 0; i < NUM_WORKERS; i++) { + ioLoops.add(new DefaultIOLoop(this::dispatchLocally)); + } + + ioLoops.forEach(ioThreadPool::execute); + acceptorThreadPool.execute(acceptorLoop); + ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT)); + acceptorLoop.awaitStart(TIMEOUT); + started.set(true); + } + + /** + * Shuts down IO loops. + */ + public void stop() { + if (started.get()) { + ioLoops.forEach(SelectorLoop::shutdown); + acceptorLoop.shutdown(); + ioThreadPool.shutdown(); + acceptorThreadPool.shutdown(); + started.set(false); + } + } + + + @Override + public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) { + DefaultMessage message = new DefaultMessage( + messageIdGenerator.incrementAndGet(), + localEp, + type, + payload); + return sendAsync(ep, message); + } + + protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) { + CompletableFuture<Void> future = new CompletableFuture<>(); + if (ep.equals(localEp)) { + dispatchLocally(message); + future.complete(null); + return future; + } + + DefaultMessageStream stream = null; + try { + stream = streams.borrowObject(ep); + stream.write(message); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } finally { + try { + streams.returnObject(ep, stream); + } catch (Exception e) { + log.warn("Failed to return stream to pool"); + } + } + return future; + } + + @Override + public CompletableFuture<byte[]> sendAndReceive( + Endpoint ep, + String type, + byte[] payload) { + CompletableFuture<byte[]> response = new CompletableFuture<>(); + Long messageId = messageIdGenerator.incrementAndGet(); + responseFutures.put(messageId, response); + DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload); + try { + sendAsync(ep, message); + } catch (Exception e) { + responseFutures.invalidate(messageId); + response.completeExceptionally(e); + } + return response; + } + + @Override + public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) { + handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload()))); + } + + @Override + public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) { + handlers.put(type, message -> executor.execute(() -> { + byte[] responsePayload = handler.apply(message.payload()); + if (responsePayload != null) { + DefaultMessage response = new DefaultMessage(message.id(), + localEp, + REPLY_MESSAGE_TYPE, + responsePayload); + sendAsync(message.sender(), response).whenComplete((result, error) -> { + log.debug("Failed to respond", error); + }); + } + })); + } + + @Override + public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) { + handlers.put(type, message -> { + handler.apply(message.payload()).whenComplete((result, error) -> { + if (error == null) { + DefaultMessage response = new DefaultMessage(message.id(), + localEp, + REPLY_MESSAGE_TYPE, + result); + sendAsync(message.sender(), response).whenComplete((r, e) -> { + if (e != null) { + log.debug("Failed to respond", e); + } + }); + } + }); + }); + } + + @Override + public void unregisterHandler(String type) { + handlers.remove(type); + } + + protected void dispatchLocally(DefaultMessage message) { + String type = message.type(); + if (REPLY_MESSAGE_TYPE.equals(type)) { + try { + CompletableFuture<byte[]> futureResponse = + responseFutures.getIfPresent(message.id()); + if (futureResponse != null) { + futureResponse.complete(message.payload()); + } else { + log.warn("Received a reply for message id:[{}]. " + + " from {}. But was unable to locate the" + + " request handle", message.id(), message.sender()); + } + } finally { + responseFutures.invalidate(message.id()); + } + return; + } + Consumer<DefaultMessage> handler = handlers.get(type); + if (handler != null) { + handler.accept(message); + } else { + log.debug("No handler registered for {}", type); + } + } + + // Get the next worker to which a client should be assigned + private synchronized DefaultIOLoop nextWorker() { + lastWorker = (lastWorker + 1) % NUM_WORKERS; + return ioLoops.get(lastWorker); + } + + /** + * Initiates open connection request and registers the pending socket + * channel with the given IO loop. + * + * @param loop loop with which the channel should be registered + * @throws java.io.IOException if the socket could not be open or connected + */ + private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException { + SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port()); + SocketChannel ch = SocketChannel.open(); + ch.configureBlocking(false); + DefaultMessageStream stream = loop.connectStream(ch); + ch.connect(sa); + return stream; + } + + // Loop for accepting client connections + private class DefaultAcceptorLoop extends AcceptorLoop { + + public DefaultAcceptorLoop(SocketAddress address) throws IOException { + super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address); + } + + @Override + protected void acceptConnection(ServerSocketChannel channel) throws IOException { + SocketChannel sc = channel.accept(); + sc.configureBlocking(false); + + Socket so = sc.socket(); + so.setTcpNoDelay(SO_NO_DELAY); + so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); + so.setSendBufferSize(SO_SEND_BUFFER_SIZE); + + nextWorker().acceptStream(sc); + } + } + + private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> { + + @Override + public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception { + } + + @Override + public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception { + stream.close(); + } + + @Override + public DefaultMessageStream makeObject(Endpoint ep) throws Exception { + DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get(); + log.info("Established a new connection to {}", ep); + return stream; + } + + @Override + public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception { + } + + @Override + public boolean validateObject(Endpoint ep, DefaultMessageStream stream) { + return stream.isClosed(); + } + } +} |