diff options
Diffstat (limited to 'framework/src/onos/utils/nio/src/test/java')
9 files changed, 0 insertions, 1398 deletions
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java deleted file mode 100644 index 2b4a85cd..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.junit.Before; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import static java.util.concurrent.Executors.newSingleThreadExecutor; -import static org.junit.Assert.fail; -import static org.onlab.util.Tools.namedThreads; - -/** - * Base class for various NIO loop unit tests. - */ -public abstract class AbstractLoopTest { - - protected static final long MAX_MS_WAIT = 1500; - - /** Block on specified countdown latch. Return when countdown reaches - * zero, or fail the test if the {@value #MAX_MS_WAIT} ms timeout expires. - * - * @param latch the latch - * @param label an identifying label - */ - protected void waitForLatch(CountDownLatch latch, String label) { - try { - boolean ok = latch.await(MAX_MS_WAIT, TimeUnit.MILLISECONDS); - if (!ok) { - fail("Latch await timeout! [" + label + "]"); - } - } catch (InterruptedException e) { - System.out.println("Latch interrupt [" + label + "] : " + e); - fail("Unexpected interrupt"); - } - } - - protected ExecutorService exec; - - @Before - public void setUp() { - exec = newSingleThreadExecutor(namedThreads("test")); - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java deleted file mode 100644 index f04a0126..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.junit.Test; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.ServerSocketChannel; -import java.util.concurrent.CountDownLatch; - -import static org.junit.Assert.assertEquals; -import static org.onlab.junit.TestTools.delay; - -/** - * Unit tests for AcceptLoop. - */ -public class AcceptorLoopTest extends AbstractLoopTest { - - private static final int PICK_EPHEMERAL = 0; - - private static final SocketAddress SOCK_ADDR = new InetSocketAddress("127.0.0.1", PICK_EPHEMERAL); - - private static class MyAcceptLoop extends AcceptorLoop { - private final CountDownLatch loopStarted = new CountDownLatch(1); - private final CountDownLatch loopFinished = new CountDownLatch(1); - private final CountDownLatch runDone = new CountDownLatch(1); - private final CountDownLatch ceaseLatch = new CountDownLatch(1); - - private int acceptCount = 0; - - MyAcceptLoop() throws IOException { - super(500, SOCK_ADDR); - } - - @Override - protected void acceptConnection(ServerSocketChannel ssc) throws IOException { - acceptCount++; - } - - @Override - public void loop() throws IOException { - loopStarted.countDown(); - super.loop(); - loopFinished.countDown(); - } - - @Override - public void run() { - super.run(); - runDone.countDown(); - } - - @Override - public void shutdown() { - super.shutdown(); - ceaseLatch.countDown(); - } - } - - @Test - public void basic() throws IOException { - MyAcceptLoop myAccLoop = new MyAcceptLoop(); - AcceptorLoop accLoop = myAccLoop; - exec.execute(accLoop); - waitForLatch(myAccLoop.loopStarted, "loopStarted"); - delay(200); // take a quick nap - accLoop.shutdown(); - waitForLatch(myAccLoop.loopFinished, "loopFinished"); - waitForLatch(myAccLoop.runDone, "runDone"); - assertEquals(0, myAccLoop.acceptCount); - } -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java deleted file mode 100644 index f42c8dc5..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.net.InetAddress; -import java.util.Random; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static org.onlab.junit.TestTools.delay; - -/** - * Integration test for the select, accept and IO loops. - */ -public class IOLoopIntegrationTest { - - private static final int THREADS = 6; - private static final int TIMEOUT = 60; - private static final int MESSAGE_LENGTH = 128; - - private static final int MILLION = 1000000; - private static final int MSG_COUNT = 40 * MILLION; - - @Before - public void warmUp() throws Exception { - Logger.getLogger("").setLevel(Level.SEVERE); - try { - runTest(MILLION, MESSAGE_LENGTH, 15); - } catch (Throwable e) { - System.err.println("Failed warmup but moving on."); - e.printStackTrace(); - } - } - - // TODO: this test can not pass in some environments, need to be improved - @Ignore - @Test - public void basic() throws Exception { - runTest(MILLION, MESSAGE_LENGTH, TIMEOUT); - } - - public void longHaul() throws Exception { - runTest(MSG_COUNT, MESSAGE_LENGTH, TIMEOUT); - } - - private void runTest(int count, int size, int timeout) throws Exception { - // Use a random port to prevent conflicts. - int port = IOLoopTestServer.PORT + new Random().nextInt(100); - - InetAddress ip = InetAddress.getLoopbackAddress(); - IOLoopTestServer server = new IOLoopTestServer(ip, THREADS, size, port); - IOLoopTestClient client = new IOLoopTestClient(ip, THREADS, count, size, port); - - server.start(); - client.start(); - delay(100); // Pause to allow loops to get going - - client.await(timeout); - client.report(); - - server.stop(); - server.report(); - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java deleted file mode 100644 index acc9a08f..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import com.google.common.collect.Lists; -import org.onlab.util.Counter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.lang.String.format; -import static java.lang.System.nanoTime; -import static java.lang.System.out; -import static org.onlab.nio.IOLoopTestServer.PORT; -import static org.onlab.util.Tools.delay; -import static org.onlab.util.Tools.namedThreads; - -/** - * Auxiliary test fixture to measure speed of NIO-based channels. - */ -public class IOLoopTestClient { - - private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class); - - private final InetAddress ip; - private final int port; - private final int msgCount; - private final int msgLength; - - private final List<CustomIOLoop> iloops = new ArrayList<>(); - private final ExecutorService ipool; - private final ExecutorService wpool; - - Counter messages; - Counter bytes; - long latencyTotal = 0; - long latencyCount = 0; - - - /** - * Main entry point to launch the client. - * - * @param args command-line arguments - * @throws java.io.IOException if unable to connect to server - * @throws InterruptedException if latch wait gets interrupted - * @throws java.util.concurrent.ExecutionException if wait gets interrupted - * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion - */ - public static void main(String[] args) - throws IOException, InterruptedException, ExecutionException, TimeoutException { - startStandalone(args); - - System.exit(0); - } - - /** - * Starts a standalone IO loop test client. - * - * @param args command-line arguments - */ - public static void startStandalone(String[] args) - throws IOException, InterruptedException, ExecutionException, TimeoutException { - InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1"); - int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; - int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000; - int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128; - int to = args.length > 4 ? Integer.parseInt(args[4]) : 60; - - log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ", - wc, mc, ml, ip); - IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT); - - client.start(); - delay(500); - - client.await(to); - client.report(); - } - - /** - * Creates a speed client. - * - * @param ip ip address of server - * @param wc worker count - * @param mc message count to send per client - * @param ml message length in bytes - * @param port socket port - * @throws java.io.IOException if unable to create IO loops - */ - public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException { - this.ip = ip; - this.port = port; - this.msgCount = mc; - this.msgLength = ml; - this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker")); - this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop")); - - for (int i = 0; i < wc; i++) { - iloops.add(new CustomIOLoop()); - } - } - - /** - * Starts the client workers. - * - * @throws java.io.IOException if unable to open connection - */ - public void start() throws IOException { - messages = new Counter(); - bytes = new Counter(); - - // First start up all the IO loops - for (CustomIOLoop l : iloops) { - ipool.execute(l); - } - - // Wait for all of them to get going - for (CustomIOLoop l : iloops) { - l.awaitStart(1000); - } - - // ... and Next open all connections; one-per-loop - for (CustomIOLoop l : iloops) { - openConnection(l); - } - } - - - /** - * Initiates open connection request and registers the pending socket - * channel with the given IO loop. - * - * @param loop loop with which the channel should be registered - * @throws java.io.IOException if the socket could not be open or connected - */ - private void openConnection(CustomIOLoop loop) throws IOException { - SocketAddress sa = new InetSocketAddress(ip, port); - SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(false); - loop.connectStream(ch); - ch.connect(sa); - } - - - /** - * Waits for the client workers to complete. - * - * @param secs timeout in seconds - * @throws java.util.concurrent.ExecutionException if execution failed - * @throws InterruptedException if interrupt occurred while waiting - * @throws java.util.concurrent.TimeoutException if timeout occurred - */ - public void await(int secs) throws InterruptedException, - ExecutionException, TimeoutException { - for (CustomIOLoop l : iloops) { - if (l.worker.task != null) { - l.worker.task.get(secs, TimeUnit.SECONDS); - latencyTotal += l.latencyTotal; - latencyCount += l.latencyCount; - } - } - messages.freeze(); - bytes.freeze(); - } - - /** - * Reports on the accumulated throughput and latency. - */ - public void report() { - DecimalFormat f = new DecimalFormat("#,##0"); - out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency", - f.format(messages.total()), f.format(bytes.total()), - f.format(messages.throughput()), - f.format(bytes.throughput() / (1024 * msgLength)), - f.format(latencyTotal / latencyCount))); - } - - - // Loop for transfer of fixed-length messages - private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> { - - Worker worker = new Worker(); - long latencyTotal = 0; - long latencyCount = 0; - - - public CustomIOLoop() throws IOException { - super(500); - } - - - @Override - protected TestMessageStream createStream(ByteChannel channel) { - return new TestMessageStream(msgLength, channel, this); - } - - @Override - protected synchronized void removeStream(MessageStream<TestMessage> stream) { - super.removeStream(stream); - messages.add(stream.messagesIn().total()); - bytes.add(stream.bytesIn().total()); - stream.messagesOut().reset(); - stream.bytesOut().reset(); - } - - @Override - protected void processMessages(List<TestMessage> messages, - MessageStream<TestMessage> stream) { - for (TestMessage message : messages) { - // TODO: summarize latency data better - latencyTotal += nanoTime() - message.requestorTime(); - latencyCount++; - } - worker.release(messages.size()); - } - - @Override - protected void connect(SelectionKey key) throws IOException { - super.connect(key); - TestMessageStream b = (TestMessageStream) key.attachment(); - Worker w = ((CustomIOLoop) b.loop()).worker; - w.pump(b); - } - - } - - /** - * Auxiliary worker to connect and pump batched messages using blocking I/O. - */ - private class Worker implements Runnable { - - private static final int BATCH_SIZE = 50; - private static final int PERMITS = 2 * BATCH_SIZE; - - private TestMessageStream stream; - private FutureTask<Worker> task; - - // Stuff to throttle pump - private final Semaphore semaphore = new Semaphore(PERMITS); - private int msgWritten; - - void pump(TestMessageStream stream) { - this.stream = stream; - task = new FutureTask<>(this, this); - wpool.execute(task); - } - - @Override - public void run() { - try { - log.info("Worker started..."); - - while (msgWritten < msgCount) { - int size = Math.min(BATCH_SIZE, msgCount - msgWritten); - writeBatch(size); - msgWritten += size; - } - - // Now try to get all the permits back before sending poison pill - semaphore.acquireUninterruptibly(PERMITS); - stream.close(); - - log.info("Worker done..."); - - } catch (IOException e) { - log.error("Worker unable to perform I/O", e); - } - } - - - private void writeBatch(int size) throws IOException { - // Build a batch of messages - List<TestMessage> batch = Lists.newArrayListWithCapacity(size); - for (int i = 0; i < size; i++) { - batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding())); - } - acquire(size); - stream.write(batch); - } - - - // Release permits based on the specified number of message credits - private void release(int permits) { - semaphore.release(permits); - } - - // Acquire permit for a single batch - private void acquire(int permits) { - semaphore.acquireUninterruptibly(permits); - } - - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java deleted file mode 100644 index d5ce5f39..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import com.google.common.collect.Lists; -import org.onlab.util.Counter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.ByteChannel; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static java.lang.String.format; -import static java.lang.System.out; -import static org.onlab.util.Tools.delay; -import static org.onlab.util.Tools.namedThreads; - -/** - * Auxiliary test fixture to measure speed of NIO-based channels. - */ -public class IOLoopTestServer { - - private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class); - - private static final int PRUNE_FREQUENCY = 1000; - - static final int PORT = 9876; - static final long TIMEOUT = 1000; - - static final boolean SO_NO_DELAY = false; - static final int SO_SEND_BUFFER_SIZE = 128 * 1024; - static final int SO_RCV_BUFFER_SIZE = 128 * 1024; - - static final DecimalFormat FORMAT = new DecimalFormat("#,##0"); - - private final AcceptorLoop aloop; - private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept")); - - private final List<CustomIOLoop> iloops = new ArrayList<>(); - private final ExecutorService ipool; - - private final int workerCount; - private final int msgLength; - private int lastWorker = -1; - - Counter messages; - Counter bytes; - - /** - * Main entry point to launch the server. - * - * @param args command-line arguments - * @throws java.io.IOException if unable to crate IO loops - */ - public static void main(String[] args) throws IOException { - startStandalone(args); - System.exit(0); - } - - /** - * Starts a standalone IO loop test server. - * - * @param args command-line arguments - */ - public static void startStandalone(String[] args) throws IOException { - InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1"); - int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; - int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128; - - log.info("Setting up the server with {} workers, {} byte messages on {}... ", - wc, ml, ip); - IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT); - server.start(); - - // Start pruning clients and keep going until their number goes to 0. - int remaining = -1; - while (remaining == -1 || remaining > 0) { - delay(PRUNE_FREQUENCY); - int r = server.prune(); - remaining = remaining == -1 && r == 0 ? remaining : r; - } - server.stop(); - } - - /** - * Creates a speed server. - * - * @param ip optional ip of the adapter where to bind - * @param wc worker count - * @param ml message length in bytes - * @param port listen port - * @throws java.io.IOException if unable to create IO loops - */ - public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException { - this.workerCount = wc; - this.msgLength = ml; - this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop")); - - this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port)); - for (int i = 0; i < workerCount; i++) { - iloops.add(new CustomIOLoop()); - } - } - - /** - * Start the server IO loops and kicks off throughput tracking. - */ - public void start() { - messages = new Counter(); - bytes = new Counter(); - - for (CustomIOLoop l : iloops) { - ipool.execute(l); - } - apool.execute(aloop); - - for (CustomIOLoop l : iloops) { - l.awaitStart(TIMEOUT); - } - aloop.awaitStart(TIMEOUT); - } - - /** - * Stop the server IO loops and freezes throughput tracking. - */ - public void stop() { - aloop.shutdown(); - for (CustomIOLoop l : iloops) { - l.shutdown(); - } - - for (CustomIOLoop l : iloops) { - l.awaitStop(TIMEOUT); - } - aloop.awaitStop(TIMEOUT); - - messages.freeze(); - bytes.freeze(); - } - - /** - * Reports on the accumulated throughput and latency. - */ - public void report() { - DecimalFormat f = new DecimalFormat("#,##0"); - out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs", - f.format(messages.total()), f.format(bytes.total()), - f.format(messages.throughput()), - f.format(bytes.throughput() / (1024 * msgLength)))); - } - - /** - * Prunes the IO loops of stale message buffers. - * - * @return number of remaining IO loops among all workers. - */ - public int prune() { - int count = 0; - for (CustomIOLoop l : iloops) { - count += l.pruneStaleStreams(); - } - return count; - } - - // Get the next worker to which a client should be assigned - private synchronized CustomIOLoop nextWorker() { - lastWorker = (lastWorker + 1) % workerCount; - return iloops.get(lastWorker); - } - - // Loop for transfer of fixed-length messages - private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> { - - public CustomIOLoop() throws IOException { - super(500); - } - - @Override - protected TestMessageStream createStream(ByteChannel channel) { - return new TestMessageStream(msgLength, channel, this); - } - - @Override - protected void removeStream(MessageStream<TestMessage> stream) { - super.removeStream(stream); - messages.add(stream.messagesIn().total()); - bytes.add(stream.bytesIn().total()); - } - - @Override - protected void processMessages(List<TestMessage> messages, - MessageStream<TestMessage> stream) { - try { - stream.write(createResponses(messages)); - } catch (IOException e) { - log.error("Unable to echo messages", e); - } - } - - private List<TestMessage> createResponses(List<TestMessage> messages) { - List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size()); - for (TestMessage message : messages) { - responses.add(new TestMessage(message.length(), message.requestorTime(), - System.nanoTime(), message.padding())); - } - return responses; - } - } - - // Loop for accepting client connections - private class CustomAcceptLoop extends AcceptorLoop { - - public CustomAcceptLoop(SocketAddress address) throws IOException { - super(500, address); - } - - @Override - protected void acceptConnection(ServerSocketChannel channel) throws IOException { - SocketChannel sc = channel.accept(); - sc.configureBlocking(false); - - Socket so = sc.socket(); - so.setTcpNoDelay(SO_NO_DELAY); - so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); - so.setSendBufferSize(SO_SEND_BUFFER_SIZE); - - nextWorker().acceptStream(sc); - log.info("Connected client"); - } - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java deleted file mode 100644 index 9965d389..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -/** - * Tests of the message message stream implementation. - */ -public class MessageStreamTest { - - private static final int SIZE = 64; - private static final int BIG_SIZE = 32 * 1024; - - private TestMessage message; - - private TestIOLoop loop; - private TestByteChannel channel; - private TestMessageStream stream; - private TestKey key; - - @Before - public void setUp() throws IOException { - loop = new TestIOLoop(); - channel = new TestByteChannel(); - key = new TestKey(channel); - stream = loop.createStream(channel); - stream.setKey(key); - stream.setNonStrict(); - message = new TestMessage(SIZE, 0, 0, stream.padding()); - } - - @After - public void tearDown() { - loop.shutdown(); - stream.close(); - } - - // Validates the state of the message stream - private void validate(boolean wp, boolean fr, int read, int written) { - assertEquals(wp, stream.isWritePending()); - assertEquals(fr, stream.isFlushRequired()); - assertEquals(read, channel.readBytes); - assertEquals(written, channel.writtenBytes); - } - - @Test - public void endOfStream() throws IOException { - channel.close(); - List<TestMessage> messages = stream.read(); - assertNull(messages); - } - - @Test - public void bufferGrowth() throws IOException { - // Create a stream for big messages and test the growth. - stream = new TestMessageStream(BIG_SIZE, channel, loop); - TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding()); - - stream.write(bigMessage); - stream.write(bigMessage); - stream.write(bigMessage); - stream.write(bigMessage); - stream.write(bigMessage); - } - - @Test - public void discardBeforeKey() { - // Create a stream that does not yet have the key set and discard it. - stream = loop.createStream(channel); - assertNull(stream.key()); - stream.close(); - // There is not key, so nothing to check; we just expect no problem. - } - - @Test - public void bufferedRead() throws IOException { - channel.bytesToRead = SIZE + 4; - List<TestMessage> messages = stream.read(); - assertEquals(1, messages.size()); - validate(false, false, SIZE + 4, 0); - - channel.bytesToRead = SIZE - 4; - messages = stream.read(); - assertEquals(1, messages.size()); - validate(false, false, SIZE * 2, 0); - } - - @Test - public void bufferedWrite() throws IOException { - validate(false, false, 0, 0); - - // First write is immediate... - stream.write(message); - validate(false, false, 0, SIZE); - - // Second and third get buffered... - stream.write(message); - validate(false, true, 0, SIZE); - stream.write(message); - validate(false, true, 0, SIZE); - - // Reset write, which will flush if needed; the next write is again buffered - stream.flushIfWriteNotPending(); - validate(false, false, 0, SIZE * 3); - stream.write(message); - validate(false, true, 0, SIZE * 3); - - // Select reset, which will flush if needed; the next write is again buffered - stream.flushIfPossible(); - validate(false, false, 0, SIZE * 4); - stream.write(message); - validate(false, true, 0, SIZE * 4); - stream.flush(); - validate(false, true, 0, SIZE * 4); - } - - @Test - public void bufferedWriteList() throws IOException { - validate(false, false, 0, 0); - - // First write is immediate... - List<TestMessage> messages = new ArrayList<>(); - messages.add(message); - messages.add(message); - messages.add(message); - messages.add(message); - - stream.write(messages); - validate(false, false, 0, SIZE * 4); - - stream.write(messages); - validate(false, true, 0, SIZE * 4); - - stream.flushIfPossible(); - validate(false, false, 0, SIZE * 8); - } - - @Test - public void bufferedPartialWrite() throws IOException { - validate(false, false, 0, 0); - - // First write is immediate... - stream.write(message); - validate(false, false, 0, SIZE); - - // Tell test channel to accept only half. - channel.bytesToWrite = SIZE / 2; - - // Second and third get buffered... - stream.write(message); - validate(false, true, 0, SIZE); - stream.flushIfPossible(); - validate(true, true, 0, SIZE + SIZE / 2); - } - - @Test - public void bufferedPartialWrite2() throws IOException { - validate(false, false, 0, 0); - - // First write is immediate... - stream.write(message); - validate(false, false, 0, SIZE); - - // Tell test channel to accept only half. - channel.bytesToWrite = SIZE / 2; - - // Second and third get buffered... - stream.write(message); - validate(false, true, 0, SIZE); - stream.flushIfWriteNotPending(); - validate(true, true, 0, SIZE + SIZE / 2); - } - - @Test - public void bufferedReadWrite() throws IOException { - channel.bytesToRead = SIZE + 4; - List<TestMessage> messages = stream.read(); - assertEquals(1, messages.size()); - validate(false, false, SIZE + 4, 0); - - stream.write(message); - validate(false, false, SIZE + 4, SIZE); - - channel.bytesToRead = SIZE - 4; - messages = stream.read(); - assertEquals(1, messages.size()); - validate(false, false, SIZE * 2, SIZE); - } - - // Fake IO driver loop - private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> { - - public TestIOLoop() throws IOException { - super(500); - } - - @Override - protected TestMessageStream createStream(ByteChannel channel) { - return new TestMessageStream(SIZE, channel, this); - } - - @Override - protected void processMessages(List<TestMessage> messages, - MessageStream<TestMessage> stream) { - } - - } - - // Byte channel test fixture - private static class TestByteChannel extends SelectableChannel implements ByteChannel { - - private static final int BUFFER_LENGTH = 1024; - byte[] bytes = new byte[BUFFER_LENGTH]; - int bytesToWrite = BUFFER_LENGTH; - int bytesToRead = BUFFER_LENGTH; - int writtenBytes = 0; - int readBytes = 0; - - @Override - public int read(ByteBuffer dst) throws IOException { - int l = Math.min(dst.remaining(), bytesToRead); - if (bytesToRead > 0) { - readBytes += l; - dst.put(bytes, 0, l); - } - return l; - } - - @Override - public int write(ByteBuffer src) throws IOException { - int l = Math.min(src.remaining(), bytesToWrite); - writtenBytes += l; - src.get(bytes, 0, l); - return l; - } - - @Override - public Object blockingLock() { - return null; - } - - @Override - public SelectableChannel configureBlocking(boolean arg0) throws IOException { - return null; - } - - @Override - public boolean isBlocking() { - return false; - } - - @Override - public boolean isRegistered() { - return false; - } - - @Override - public SelectionKey keyFor(Selector arg0) { - return null; - } - - @Override - public SelectorProvider provider() { - return null; - } - - @Override - public SelectionKey register(Selector arg0, int arg1, Object arg2) - throws ClosedChannelException { - return null; - } - - @Override - public int validOps() { - return 0; - } - - @Override - protected void implCloseChannel() throws IOException { - bytesToRead = -1; - } - - } - - // Selection key text fixture - private static class TestKey extends SelectionKey { - - private SelectableChannel channel; - - public TestKey(TestByteChannel channel) { - this.channel = channel; - } - - @Override - public void cancel() { - } - - @Override - public SelectableChannel channel() { - return channel; - } - - @Override - public int interestOps() { - return 0; - } - - @Override - public SelectionKey interestOps(int ops) { - return null; - } - - @Override - public boolean isValid() { - return true; - } - - @Override - public int readyOps() { - return 0; - } - - @Override - public Selector selector() { - return null; - } - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java deleted file mode 100644 index cf5b2bb9..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.AbstractSelectableChannel; -import java.nio.channels.spi.AbstractSelector; -import java.util.Set; - -/** - * A selector instrumented for unit tests. - */ -public class MockSelector extends AbstractSelector { - - int wakeUpCount = 0; - - /** - * Creates a mock selector, specifying null as the SelectorProvider. - */ - public MockSelector() { - super(null); - } - - @Override - public String toString() { - return "{MockSelector: wake=" + wakeUpCount + "}"; - } - - @Override - protected void implCloseSelector() throws IOException { - } - - @Override - protected SelectionKey register(AbstractSelectableChannel ch, int ops, - Object att) { - return null; - } - - @Override - public Set<SelectionKey> keys() { - return null; - } - - @Override - public Set<SelectionKey> selectedKeys() { - return null; - } - - @Override - public int selectNow() throws IOException { - return 0; - } - - @Override - public int select(long timeout) throws IOException { - return 0; - } - - @Override - public int select() throws IOException { - return 0; - } - - @Override - public Selector wakeup() { - wakeUpCount++; - return null; - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java deleted file mode 100644 index 825479b5..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Test message for measuring rate and round-trip latency. - */ -public class TestMessage extends AbstractMessage { - - private final byte[] padding; - - private final long requestorTime; - private final long responderTime; - - /** - * Creates a new message with the specified data. - * - * @param requestorTime requester time - * @param responderTime responder time - * @param padding message padding - */ - TestMessage(int length, long requestorTime, long responderTime, byte[] padding) { - this.length = length; - this.requestorTime = requestorTime; - this.responderTime = responderTime; - this.padding = checkNotNull(padding, "Padding cannot be null"); - } - - public long requestorTime() { - return requestorTime; - } - - public long responderTime() { - return responderTime; - } - - public byte[] padding() { - return padding; - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java deleted file mode 100644 index fc86beb9..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -/** - * Fixed-length message transfer buffer. - */ -public class TestMessageStream extends MessageStream<TestMessage> { - - private static final String E_WRONG_LEN = "Illegal message length: "; - private static final long START_TAG = 0xfeedcafedeaddeedL; - private static final long END_TAG = 0xbeadcafedeaddeedL; - private static final int META_LENGTH = 40; - - private final int length; - private boolean isStrict = true; - - public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) { - super(loop, ch, 64 * 1024, 500); - checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40"); - this.length = length; - } - - void setNonStrict() { - isStrict = false; - } - - @Override - protected TestMessage read(ByteBuffer rb) { - if (rb.remaining() < length) { - return null; - } - - long startTag = rb.getLong(); - if (isStrict) { - checkState(startTag == START_TAG, "Incorrect message start"); - } - - long size = rb.getLong(); - long requestorTime = rb.getLong(); - long responderTime = rb.getLong(); - byte[] padding = padding(); - rb.get(padding); - - long endTag = rb.getLong(); - if (isStrict) { - checkState(endTag == END_TAG, "Incorrect message end"); - } - - return new TestMessage((int) size, requestorTime, responderTime, padding); - } - - @Override - protected void write(TestMessage message, ByteBuffer wb) { - if (message.length() != length) { - throw new IllegalArgumentException(E_WRONG_LEN + message.length()); - } - - wb.putLong(START_TAG); - wb.putLong(message.length()); - wb.putLong(message.requestorTime()); - wb.putLong(message.responderTime()); - wb.put(message.padding(), 0, length - META_LENGTH); - wb.putLong(END_TAG); - } - - public byte[] padding() { - return new byte[length - META_LENGTH]; - } -} |