summaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/nio/src/test/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/nio/src/test/java/org')
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java60
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java87
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java82
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java324
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java256
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java359
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java85
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java56
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java89
9 files changed, 0 insertions, 1398 deletions
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java
deleted file mode 100644
index 2b4a85cd..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import org.junit.Before;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.junit.Assert.fail;
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Base class for various NIO loop unit tests.
- */
-public abstract class AbstractLoopTest {
-
- protected static final long MAX_MS_WAIT = 1500;
-
- /** Block on specified countdown latch. Return when countdown reaches
- * zero, or fail the test if the {@value #MAX_MS_WAIT} ms timeout expires.
- *
- * @param latch the latch
- * @param label an identifying label
- */
- protected void waitForLatch(CountDownLatch latch, String label) {
- try {
- boolean ok = latch.await(MAX_MS_WAIT, TimeUnit.MILLISECONDS);
- if (!ok) {
- fail("Latch await timeout! [" + label + "]");
- }
- } catch (InterruptedException e) {
- System.out.println("Latch interrupt [" + label + "] : " + e);
- fail("Unexpected interrupt");
- }
- }
-
- protected ExecutorService exec;
-
- @Before
- public void setUp() {
- exec = newSingleThreadExecutor(namedThreads("test"));
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java
deleted file mode 100644
index f04a0126..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.ServerSocketChannel;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertEquals;
-import static org.onlab.junit.TestTools.delay;
-
-/**
- * Unit tests for AcceptLoop.
- */
-public class AcceptorLoopTest extends AbstractLoopTest {
-
- private static final int PICK_EPHEMERAL = 0;
-
- private static final SocketAddress SOCK_ADDR = new InetSocketAddress("127.0.0.1", PICK_EPHEMERAL);
-
- private static class MyAcceptLoop extends AcceptorLoop {
- private final CountDownLatch loopStarted = new CountDownLatch(1);
- private final CountDownLatch loopFinished = new CountDownLatch(1);
- private final CountDownLatch runDone = new CountDownLatch(1);
- private final CountDownLatch ceaseLatch = new CountDownLatch(1);
-
- private int acceptCount = 0;
-
- MyAcceptLoop() throws IOException {
- super(500, SOCK_ADDR);
- }
-
- @Override
- protected void acceptConnection(ServerSocketChannel ssc) throws IOException {
- acceptCount++;
- }
-
- @Override
- public void loop() throws IOException {
- loopStarted.countDown();
- super.loop();
- loopFinished.countDown();
- }
-
- @Override
- public void run() {
- super.run();
- runDone.countDown();
- }
-
- @Override
- public void shutdown() {
- super.shutdown();
- ceaseLatch.countDown();
- }
- }
-
- @Test
- public void basic() throws IOException {
- MyAcceptLoop myAccLoop = new MyAcceptLoop();
- AcceptorLoop accLoop = myAccLoop;
- exec.execute(accLoop);
- waitForLatch(myAccLoop.loopStarted, "loopStarted");
- delay(200); // take a quick nap
- accLoop.shutdown();
- waitForLatch(myAccLoop.loopFinished, "loopFinished");
- waitForLatch(myAccLoop.runDone, "runDone");
- assertEquals(0, myAccLoop.acceptCount);
- }
-}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
deleted file mode 100644
index f42c8dc5..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.Random;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import static org.onlab.junit.TestTools.delay;
-
-/**
- * Integration test for the select, accept and IO loops.
- */
-public class IOLoopIntegrationTest {
-
- private static final int THREADS = 6;
- private static final int TIMEOUT = 60;
- private static final int MESSAGE_LENGTH = 128;
-
- private static final int MILLION = 1000000;
- private static final int MSG_COUNT = 40 * MILLION;
-
- @Before
- public void warmUp() throws Exception {
- Logger.getLogger("").setLevel(Level.SEVERE);
- try {
- runTest(MILLION, MESSAGE_LENGTH, 15);
- } catch (Throwable e) {
- System.err.println("Failed warmup but moving on.");
- e.printStackTrace();
- }
- }
-
- // TODO: this test can not pass in some environments, need to be improved
- @Ignore
- @Test
- public void basic() throws Exception {
- runTest(MILLION, MESSAGE_LENGTH, TIMEOUT);
- }
-
- public void longHaul() throws Exception {
- runTest(MSG_COUNT, MESSAGE_LENGTH, TIMEOUT);
- }
-
- private void runTest(int count, int size, int timeout) throws Exception {
- // Use a random port to prevent conflicts.
- int port = IOLoopTestServer.PORT + new Random().nextInt(100);
-
- InetAddress ip = InetAddress.getLoopbackAddress();
- IOLoopTestServer server = new IOLoopTestServer(ip, THREADS, size, port);
- IOLoopTestClient client = new IOLoopTestClient(ip, THREADS, count, size, port);
-
- server.start();
- client.start();
- delay(100); // Pause to allow loops to get going
-
- client.await(timeout);
- client.report();
-
- server.stop();
- server.report();
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
deleted file mode 100644
index acc9a08f..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import com.google.common.collect.Lists;
-import org.onlab.util.Counter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.lang.String.format;
-import static java.lang.System.nanoTime;
-import static java.lang.System.out;
-import static org.onlab.nio.IOLoopTestServer.PORT;
-import static org.onlab.util.Tools.delay;
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Auxiliary test fixture to measure speed of NIO-based channels.
- */
-public class IOLoopTestClient {
-
- private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
-
- private final InetAddress ip;
- private final int port;
- private final int msgCount;
- private final int msgLength;
-
- private final List<CustomIOLoop> iloops = new ArrayList<>();
- private final ExecutorService ipool;
- private final ExecutorService wpool;
-
- Counter messages;
- Counter bytes;
- long latencyTotal = 0;
- long latencyCount = 0;
-
-
- /**
- * Main entry point to launch the client.
- *
- * @param args command-line arguments
- * @throws java.io.IOException if unable to connect to server
- * @throws InterruptedException if latch wait gets interrupted
- * @throws java.util.concurrent.ExecutionException if wait gets interrupted
- * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
- */
- public static void main(String[] args)
- throws IOException, InterruptedException, ExecutionException, TimeoutException {
- startStandalone(args);
-
- System.exit(0);
- }
-
- /**
- * Starts a standalone IO loop test client.
- *
- * @param args command-line arguments
- */
- public static void startStandalone(String[] args)
- throws IOException, InterruptedException, ExecutionException, TimeoutException {
- InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
- int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
- int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
- int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
- int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
-
- log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
- wc, mc, ml, ip);
- IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
-
- client.start();
- delay(500);
-
- client.await(to);
- client.report();
- }
-
- /**
- * Creates a speed client.
- *
- * @param ip ip address of server
- * @param wc worker count
- * @param mc message count to send per client
- * @param ml message length in bytes
- * @param port socket port
- * @throws java.io.IOException if unable to create IO loops
- */
- public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
- this.ip = ip;
- this.port = port;
- this.msgCount = mc;
- this.msgLength = ml;
- this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
- this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
-
- for (int i = 0; i < wc; i++) {
- iloops.add(new CustomIOLoop());
- }
- }
-
- /**
- * Starts the client workers.
- *
- * @throws java.io.IOException if unable to open connection
- */
- public void start() throws IOException {
- messages = new Counter();
- bytes = new Counter();
-
- // First start up all the IO loops
- for (CustomIOLoop l : iloops) {
- ipool.execute(l);
- }
-
- // Wait for all of them to get going
- for (CustomIOLoop l : iloops) {
- l.awaitStart(1000);
- }
-
- // ... and Next open all connections; one-per-loop
- for (CustomIOLoop l : iloops) {
- openConnection(l);
- }
- }
-
-
- /**
- * Initiates open connection request and registers the pending socket
- * channel with the given IO loop.
- *
- * @param loop loop with which the channel should be registered
- * @throws java.io.IOException if the socket could not be open or connected
- */
- private void openConnection(CustomIOLoop loop) throws IOException {
- SocketAddress sa = new InetSocketAddress(ip, port);
- SocketChannel ch = SocketChannel.open();
- ch.configureBlocking(false);
- loop.connectStream(ch);
- ch.connect(sa);
- }
-
-
- /**
- * Waits for the client workers to complete.
- *
- * @param secs timeout in seconds
- * @throws java.util.concurrent.ExecutionException if execution failed
- * @throws InterruptedException if interrupt occurred while waiting
- * @throws java.util.concurrent.TimeoutException if timeout occurred
- */
- public void await(int secs) throws InterruptedException,
- ExecutionException, TimeoutException {
- for (CustomIOLoop l : iloops) {
- if (l.worker.task != null) {
- l.worker.task.get(secs, TimeUnit.SECONDS);
- latencyTotal += l.latencyTotal;
- latencyCount += l.latencyCount;
- }
- }
- messages.freeze();
- bytes.freeze();
- }
-
- /**
- * Reports on the accumulated throughput and latency.
- */
- public void report() {
- DecimalFormat f = new DecimalFormat("#,##0");
- out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
- f.format(messages.total()), f.format(bytes.total()),
- f.format(messages.throughput()),
- f.format(bytes.throughput() / (1024 * msgLength)),
- f.format(latencyTotal / latencyCount)));
- }
-
-
- // Loop for transfer of fixed-length messages
- private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
-
- Worker worker = new Worker();
- long latencyTotal = 0;
- long latencyCount = 0;
-
-
- public CustomIOLoop() throws IOException {
- super(500);
- }
-
-
- @Override
- protected TestMessageStream createStream(ByteChannel channel) {
- return new TestMessageStream(msgLength, channel, this);
- }
-
- @Override
- protected synchronized void removeStream(MessageStream<TestMessage> stream) {
- super.removeStream(stream);
- messages.add(stream.messagesIn().total());
- bytes.add(stream.bytesIn().total());
- stream.messagesOut().reset();
- stream.bytesOut().reset();
- }
-
- @Override
- protected void processMessages(List<TestMessage> messages,
- MessageStream<TestMessage> stream) {
- for (TestMessage message : messages) {
- // TODO: summarize latency data better
- latencyTotal += nanoTime() - message.requestorTime();
- latencyCount++;
- }
- worker.release(messages.size());
- }
-
- @Override
- protected void connect(SelectionKey key) throws IOException {
- super.connect(key);
- TestMessageStream b = (TestMessageStream) key.attachment();
- Worker w = ((CustomIOLoop) b.loop()).worker;
- w.pump(b);
- }
-
- }
-
- /**
- * Auxiliary worker to connect and pump batched messages using blocking I/O.
- */
- private class Worker implements Runnable {
-
- private static final int BATCH_SIZE = 50;
- private static final int PERMITS = 2 * BATCH_SIZE;
-
- private TestMessageStream stream;
- private FutureTask<Worker> task;
-
- // Stuff to throttle pump
- private final Semaphore semaphore = new Semaphore(PERMITS);
- private int msgWritten;
-
- void pump(TestMessageStream stream) {
- this.stream = stream;
- task = new FutureTask<>(this, this);
- wpool.execute(task);
- }
-
- @Override
- public void run() {
- try {
- log.info("Worker started...");
-
- while (msgWritten < msgCount) {
- int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
- writeBatch(size);
- msgWritten += size;
- }
-
- // Now try to get all the permits back before sending poison pill
- semaphore.acquireUninterruptibly(PERMITS);
- stream.close();
-
- log.info("Worker done...");
-
- } catch (IOException e) {
- log.error("Worker unable to perform I/O", e);
- }
- }
-
-
- private void writeBatch(int size) throws IOException {
- // Build a batch of messages
- List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
- for (int i = 0; i < size; i++) {
- batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
- }
- acquire(size);
- stream.write(batch);
- }
-
-
- // Release permits based on the specified number of message credits
- private void release(int permits) {
- semaphore.release(permits);
- }
-
- // Acquire permit for a single batch
- private void acquire(int permits) {
- semaphore.acquireUninterruptibly(permits);
- }
-
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
deleted file mode 100644
index d5ce5f39..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import com.google.common.collect.Lists;
-import org.onlab.util.Counter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static java.lang.String.format;
-import static java.lang.System.out;
-import static org.onlab.util.Tools.delay;
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Auxiliary test fixture to measure speed of NIO-based channels.
- */
-public class IOLoopTestServer {
-
- private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
-
- private static final int PRUNE_FREQUENCY = 1000;
-
- static final int PORT = 9876;
- static final long TIMEOUT = 1000;
-
- static final boolean SO_NO_DELAY = false;
- static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
- static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
-
- static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
-
- private final AcceptorLoop aloop;
- private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
-
- private final List<CustomIOLoop> iloops = new ArrayList<>();
- private final ExecutorService ipool;
-
- private final int workerCount;
- private final int msgLength;
- private int lastWorker = -1;
-
- Counter messages;
- Counter bytes;
-
- /**
- * Main entry point to launch the server.
- *
- * @param args command-line arguments
- * @throws java.io.IOException if unable to crate IO loops
- */
- public static void main(String[] args) throws IOException {
- startStandalone(args);
- System.exit(0);
- }
-
- /**
- * Starts a standalone IO loop test server.
- *
- * @param args command-line arguments
- */
- public static void startStandalone(String[] args) throws IOException {
- InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
- int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
- int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
-
- log.info("Setting up the server with {} workers, {} byte messages on {}... ",
- wc, ml, ip);
- IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
- server.start();
-
- // Start pruning clients and keep going until their number goes to 0.
- int remaining = -1;
- while (remaining == -1 || remaining > 0) {
- delay(PRUNE_FREQUENCY);
- int r = server.prune();
- remaining = remaining == -1 && r == 0 ? remaining : r;
- }
- server.stop();
- }
-
- /**
- * Creates a speed server.
- *
- * @param ip optional ip of the adapter where to bind
- * @param wc worker count
- * @param ml message length in bytes
- * @param port listen port
- * @throws java.io.IOException if unable to create IO loops
- */
- public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
- this.workerCount = wc;
- this.msgLength = ml;
- this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
-
- this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
- for (int i = 0; i < workerCount; i++) {
- iloops.add(new CustomIOLoop());
- }
- }
-
- /**
- * Start the server IO loops and kicks off throughput tracking.
- */
- public void start() {
- messages = new Counter();
- bytes = new Counter();
-
- for (CustomIOLoop l : iloops) {
- ipool.execute(l);
- }
- apool.execute(aloop);
-
- for (CustomIOLoop l : iloops) {
- l.awaitStart(TIMEOUT);
- }
- aloop.awaitStart(TIMEOUT);
- }
-
- /**
- * Stop the server IO loops and freezes throughput tracking.
- */
- public void stop() {
- aloop.shutdown();
- for (CustomIOLoop l : iloops) {
- l.shutdown();
- }
-
- for (CustomIOLoop l : iloops) {
- l.awaitStop(TIMEOUT);
- }
- aloop.awaitStop(TIMEOUT);
-
- messages.freeze();
- bytes.freeze();
- }
-
- /**
- * Reports on the accumulated throughput and latency.
- */
- public void report() {
- DecimalFormat f = new DecimalFormat("#,##0");
- out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
- f.format(messages.total()), f.format(bytes.total()),
- f.format(messages.throughput()),
- f.format(bytes.throughput() / (1024 * msgLength))));
- }
-
- /**
- * Prunes the IO loops of stale message buffers.
- *
- * @return number of remaining IO loops among all workers.
- */
- public int prune() {
- int count = 0;
- for (CustomIOLoop l : iloops) {
- count += l.pruneStaleStreams();
- }
- return count;
- }
-
- // Get the next worker to which a client should be assigned
- private synchronized CustomIOLoop nextWorker() {
- lastWorker = (lastWorker + 1) % workerCount;
- return iloops.get(lastWorker);
- }
-
- // Loop for transfer of fixed-length messages
- private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
-
- public CustomIOLoop() throws IOException {
- super(500);
- }
-
- @Override
- protected TestMessageStream createStream(ByteChannel channel) {
- return new TestMessageStream(msgLength, channel, this);
- }
-
- @Override
- protected void removeStream(MessageStream<TestMessage> stream) {
- super.removeStream(stream);
- messages.add(stream.messagesIn().total());
- bytes.add(stream.bytesIn().total());
- }
-
- @Override
- protected void processMessages(List<TestMessage> messages,
- MessageStream<TestMessage> stream) {
- try {
- stream.write(createResponses(messages));
- } catch (IOException e) {
- log.error("Unable to echo messages", e);
- }
- }
-
- private List<TestMessage> createResponses(List<TestMessage> messages) {
- List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
- for (TestMessage message : messages) {
- responses.add(new TestMessage(message.length(), message.requestorTime(),
- System.nanoTime(), message.padding()));
- }
- return responses;
- }
- }
-
- // Loop for accepting client connections
- private class CustomAcceptLoop extends AcceptorLoop {
-
- public CustomAcceptLoop(SocketAddress address) throws IOException {
- super(500, address);
- }
-
- @Override
- protected void acceptConnection(ServerSocketChannel channel) throws IOException {
- SocketChannel sc = channel.accept();
- sc.configureBlocking(false);
-
- Socket so = sc.socket();
- so.setTcpNoDelay(SO_NO_DELAY);
- so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
- so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
-
- nextWorker().acceptStream(sc);
- log.info("Connected client");
- }
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
deleted file mode 100644
index 9965d389..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-/**
- * Tests of the message message stream implementation.
- */
-public class MessageStreamTest {
-
- private static final int SIZE = 64;
- private static final int BIG_SIZE = 32 * 1024;
-
- private TestMessage message;
-
- private TestIOLoop loop;
- private TestByteChannel channel;
- private TestMessageStream stream;
- private TestKey key;
-
- @Before
- public void setUp() throws IOException {
- loop = new TestIOLoop();
- channel = new TestByteChannel();
- key = new TestKey(channel);
- stream = loop.createStream(channel);
- stream.setKey(key);
- stream.setNonStrict();
- message = new TestMessage(SIZE, 0, 0, stream.padding());
- }
-
- @After
- public void tearDown() {
- loop.shutdown();
- stream.close();
- }
-
- // Validates the state of the message stream
- private void validate(boolean wp, boolean fr, int read, int written) {
- assertEquals(wp, stream.isWritePending());
- assertEquals(fr, stream.isFlushRequired());
- assertEquals(read, channel.readBytes);
- assertEquals(written, channel.writtenBytes);
- }
-
- @Test
- public void endOfStream() throws IOException {
- channel.close();
- List<TestMessage> messages = stream.read();
- assertNull(messages);
- }
-
- @Test
- public void bufferGrowth() throws IOException {
- // Create a stream for big messages and test the growth.
- stream = new TestMessageStream(BIG_SIZE, channel, loop);
- TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
-
- stream.write(bigMessage);
- stream.write(bigMessage);
- stream.write(bigMessage);
- stream.write(bigMessage);
- stream.write(bigMessage);
- }
-
- @Test
- public void discardBeforeKey() {
- // Create a stream that does not yet have the key set and discard it.
- stream = loop.createStream(channel);
- assertNull(stream.key());
- stream.close();
- // There is not key, so nothing to check; we just expect no problem.
- }
-
- @Test
- public void bufferedRead() throws IOException {
- channel.bytesToRead = SIZE + 4;
- List<TestMessage> messages = stream.read();
- assertEquals(1, messages.size());
- validate(false, false, SIZE + 4, 0);
-
- channel.bytesToRead = SIZE - 4;
- messages = stream.read();
- assertEquals(1, messages.size());
- validate(false, false, SIZE * 2, 0);
- }
-
- @Test
- public void bufferedWrite() throws IOException {
- validate(false, false, 0, 0);
-
- // First write is immediate...
- stream.write(message);
- validate(false, false, 0, SIZE);
-
- // Second and third get buffered...
- stream.write(message);
- validate(false, true, 0, SIZE);
- stream.write(message);
- validate(false, true, 0, SIZE);
-
- // Reset write, which will flush if needed; the next write is again buffered
- stream.flushIfWriteNotPending();
- validate(false, false, 0, SIZE * 3);
- stream.write(message);
- validate(false, true, 0, SIZE * 3);
-
- // Select reset, which will flush if needed; the next write is again buffered
- stream.flushIfPossible();
- validate(false, false, 0, SIZE * 4);
- stream.write(message);
- validate(false, true, 0, SIZE * 4);
- stream.flush();
- validate(false, true, 0, SIZE * 4);
- }
-
- @Test
- public void bufferedWriteList() throws IOException {
- validate(false, false, 0, 0);
-
- // First write is immediate...
- List<TestMessage> messages = new ArrayList<>();
- messages.add(message);
- messages.add(message);
- messages.add(message);
- messages.add(message);
-
- stream.write(messages);
- validate(false, false, 0, SIZE * 4);
-
- stream.write(messages);
- validate(false, true, 0, SIZE * 4);
-
- stream.flushIfPossible();
- validate(false, false, 0, SIZE * 8);
- }
-
- @Test
- public void bufferedPartialWrite() throws IOException {
- validate(false, false, 0, 0);
-
- // First write is immediate...
- stream.write(message);
- validate(false, false, 0, SIZE);
-
- // Tell test channel to accept only half.
- channel.bytesToWrite = SIZE / 2;
-
- // Second and third get buffered...
- stream.write(message);
- validate(false, true, 0, SIZE);
- stream.flushIfPossible();
- validate(true, true, 0, SIZE + SIZE / 2);
- }
-
- @Test
- public void bufferedPartialWrite2() throws IOException {
- validate(false, false, 0, 0);
-
- // First write is immediate...
- stream.write(message);
- validate(false, false, 0, SIZE);
-
- // Tell test channel to accept only half.
- channel.bytesToWrite = SIZE / 2;
-
- // Second and third get buffered...
- stream.write(message);
- validate(false, true, 0, SIZE);
- stream.flushIfWriteNotPending();
- validate(true, true, 0, SIZE + SIZE / 2);
- }
-
- @Test
- public void bufferedReadWrite() throws IOException {
- channel.bytesToRead = SIZE + 4;
- List<TestMessage> messages = stream.read();
- assertEquals(1, messages.size());
- validate(false, false, SIZE + 4, 0);
-
- stream.write(message);
- validate(false, false, SIZE + 4, SIZE);
-
- channel.bytesToRead = SIZE - 4;
- messages = stream.read();
- assertEquals(1, messages.size());
- validate(false, false, SIZE * 2, SIZE);
- }
-
- // Fake IO driver loop
- private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
-
- public TestIOLoop() throws IOException {
- super(500);
- }
-
- @Override
- protected TestMessageStream createStream(ByteChannel channel) {
- return new TestMessageStream(SIZE, channel, this);
- }
-
- @Override
- protected void processMessages(List<TestMessage> messages,
- MessageStream<TestMessage> stream) {
- }
-
- }
-
- // Byte channel test fixture
- private static class TestByteChannel extends SelectableChannel implements ByteChannel {
-
- private static final int BUFFER_LENGTH = 1024;
- byte[] bytes = new byte[BUFFER_LENGTH];
- int bytesToWrite = BUFFER_LENGTH;
- int bytesToRead = BUFFER_LENGTH;
- int writtenBytes = 0;
- int readBytes = 0;
-
- @Override
- public int read(ByteBuffer dst) throws IOException {
- int l = Math.min(dst.remaining(), bytesToRead);
- if (bytesToRead > 0) {
- readBytes += l;
- dst.put(bytes, 0, l);
- }
- return l;
- }
-
- @Override
- public int write(ByteBuffer src) throws IOException {
- int l = Math.min(src.remaining(), bytesToWrite);
- writtenBytes += l;
- src.get(bytes, 0, l);
- return l;
- }
-
- @Override
- public Object blockingLock() {
- return null;
- }
-
- @Override
- public SelectableChannel configureBlocking(boolean arg0) throws IOException {
- return null;
- }
-
- @Override
- public boolean isBlocking() {
- return false;
- }
-
- @Override
- public boolean isRegistered() {
- return false;
- }
-
- @Override
- public SelectionKey keyFor(Selector arg0) {
- return null;
- }
-
- @Override
- public SelectorProvider provider() {
- return null;
- }
-
- @Override
- public SelectionKey register(Selector arg0, int arg1, Object arg2)
- throws ClosedChannelException {
- return null;
- }
-
- @Override
- public int validOps() {
- return 0;
- }
-
- @Override
- protected void implCloseChannel() throws IOException {
- bytesToRead = -1;
- }
-
- }
-
- // Selection key text fixture
- private static class TestKey extends SelectionKey {
-
- private SelectableChannel channel;
-
- public TestKey(TestByteChannel channel) {
- this.channel = channel;
- }
-
- @Override
- public void cancel() {
- }
-
- @Override
- public SelectableChannel channel() {
- return channel;
- }
-
- @Override
- public int interestOps() {
- return 0;
- }
-
- @Override
- public SelectionKey interestOps(int ops) {
- return null;
- }
-
- @Override
- public boolean isValid() {
- return true;
- }
-
- @Override
- public int readyOps() {
- return 0;
- }
-
- @Override
- public Selector selector() {
- return null;
- }
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java
deleted file mode 100644
index cf5b2bb9..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.AbstractSelectableChannel;
-import java.nio.channels.spi.AbstractSelector;
-import java.util.Set;
-
-/**
- * A selector instrumented for unit tests.
- */
-public class MockSelector extends AbstractSelector {
-
- int wakeUpCount = 0;
-
- /**
- * Creates a mock selector, specifying null as the SelectorProvider.
- */
- public MockSelector() {
- super(null);
- }
-
- @Override
- public String toString() {
- return "{MockSelector: wake=" + wakeUpCount + "}";
- }
-
- @Override
- protected void implCloseSelector() throws IOException {
- }
-
- @Override
- protected SelectionKey register(AbstractSelectableChannel ch, int ops,
- Object att) {
- return null;
- }
-
- @Override
- public Set<SelectionKey> keys() {
- return null;
- }
-
- @Override
- public Set<SelectionKey> selectedKeys() {
- return null;
- }
-
- @Override
- public int selectNow() throws IOException {
- return 0;
- }
-
- @Override
- public int select(long timeout) throws IOException {
- return 0;
- }
-
- @Override
- public int select() throws IOException {
- return 0;
- }
-
- @Override
- public Selector wakeup() {
- wakeUpCount++;
- return null;
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
deleted file mode 100644
index 825479b5..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Test message for measuring rate and round-trip latency.
- */
-public class TestMessage extends AbstractMessage {
-
- private final byte[] padding;
-
- private final long requestorTime;
- private final long responderTime;
-
- /**
- * Creates a new message with the specified data.
- *
- * @param requestorTime requester time
- * @param responderTime responder time
- * @param padding message padding
- */
- TestMessage(int length, long requestorTime, long responderTime, byte[] padding) {
- this.length = length;
- this.requestorTime = requestorTime;
- this.responderTime = responderTime;
- this.padding = checkNotNull(padding, "Padding cannot be null");
- }
-
- public long requestorTime() {
- return requestorTime;
- }
-
- public long responderTime() {
- return responderTime;
- }
-
- public byte[] padding() {
- return padding;
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
deleted file mode 100644
index fc86beb9..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Fixed-length message transfer buffer.
- */
-public class TestMessageStream extends MessageStream<TestMessage> {
-
- private static final String E_WRONG_LEN = "Illegal message length: ";
- private static final long START_TAG = 0xfeedcafedeaddeedL;
- private static final long END_TAG = 0xbeadcafedeaddeedL;
- private static final int META_LENGTH = 40;
-
- private final int length;
- private boolean isStrict = true;
-
- public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
- super(loop, ch, 64 * 1024, 500);
- checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
- this.length = length;
- }
-
- void setNonStrict() {
- isStrict = false;
- }
-
- @Override
- protected TestMessage read(ByteBuffer rb) {
- if (rb.remaining() < length) {
- return null;
- }
-
- long startTag = rb.getLong();
- if (isStrict) {
- checkState(startTag == START_TAG, "Incorrect message start");
- }
-
- long size = rb.getLong();
- long requestorTime = rb.getLong();
- long responderTime = rb.getLong();
- byte[] padding = padding();
- rb.get(padding);
-
- long endTag = rb.getLong();
- if (isStrict) {
- checkState(endTag == END_TAG, "Incorrect message end");
- }
-
- return new TestMessage((int) size, requestorTime, responderTime, padding);
- }
-
- @Override
- protected void write(TestMessage message, ByteBuffer wb) {
- if (message.length() != length) {
- throw new IllegalArgumentException(E_WRONG_LEN + message.length());
- }
-
- wb.putLong(START_TAG);
- wb.putLong(message.length());
- wb.putLong(message.requestorTime());
- wb.putLong(message.responderTime());
- wb.put(message.padding(), 0, length - META_LENGTH);
- wb.putLong(END_TAG);
- }
-
- public byte[] padding() {
- return new byte[length - META_LENGTH];
- }
-}