diff options
Diffstat (limited to 'framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java')
-rw-r--r-- | framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java | 324 |
1 files changed, 0 insertions, 324 deletions
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java deleted file mode 100644 index acc9a08f..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import com.google.common.collect.Lists; -import org.onlab.util.Counter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.lang.String.format; -import static java.lang.System.nanoTime; -import static java.lang.System.out; -import static org.onlab.nio.IOLoopTestServer.PORT; -import static org.onlab.util.Tools.delay; -import static org.onlab.util.Tools.namedThreads; - -/** - * Auxiliary test fixture to measure speed of NIO-based channels. - */ -public class IOLoopTestClient { - - private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class); - - private final InetAddress ip; - private final int port; - private final int msgCount; - private final int msgLength; - - private final List<CustomIOLoop> iloops = new ArrayList<>(); - private final ExecutorService ipool; - private final ExecutorService wpool; - - Counter messages; - Counter bytes; - long latencyTotal = 0; - long latencyCount = 0; - - - /** - * Main entry point to launch the client. - * - * @param args command-line arguments - * @throws java.io.IOException if unable to connect to server - * @throws InterruptedException if latch wait gets interrupted - * @throws java.util.concurrent.ExecutionException if wait gets interrupted - * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion - */ - public static void main(String[] args) - throws IOException, InterruptedException, ExecutionException, TimeoutException { - startStandalone(args); - - System.exit(0); - } - - /** - * Starts a standalone IO loop test client. - * - * @param args command-line arguments - */ - public static void startStandalone(String[] args) - throws IOException, InterruptedException, ExecutionException, TimeoutException { - InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1"); - int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; - int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000; - int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128; - int to = args.length > 4 ? Integer.parseInt(args[4]) : 60; - - log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ", - wc, mc, ml, ip); - IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT); - - client.start(); - delay(500); - - client.await(to); - client.report(); - } - - /** - * Creates a speed client. - * - * @param ip ip address of server - * @param wc worker count - * @param mc message count to send per client - * @param ml message length in bytes - * @param port socket port - * @throws java.io.IOException if unable to create IO loops - */ - public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException { - this.ip = ip; - this.port = port; - this.msgCount = mc; - this.msgLength = ml; - this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker")); - this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop")); - - for (int i = 0; i < wc; i++) { - iloops.add(new CustomIOLoop()); - } - } - - /** - * Starts the client workers. - * - * @throws java.io.IOException if unable to open connection - */ - public void start() throws IOException { - messages = new Counter(); - bytes = new Counter(); - - // First start up all the IO loops - for (CustomIOLoop l : iloops) { - ipool.execute(l); - } - - // Wait for all of them to get going - for (CustomIOLoop l : iloops) { - l.awaitStart(1000); - } - - // ... and Next open all connections; one-per-loop - for (CustomIOLoop l : iloops) { - openConnection(l); - } - } - - - /** - * Initiates open connection request and registers the pending socket - * channel with the given IO loop. - * - * @param loop loop with which the channel should be registered - * @throws java.io.IOException if the socket could not be open or connected - */ - private void openConnection(CustomIOLoop loop) throws IOException { - SocketAddress sa = new InetSocketAddress(ip, port); - SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(false); - loop.connectStream(ch); - ch.connect(sa); - } - - - /** - * Waits for the client workers to complete. - * - * @param secs timeout in seconds - * @throws java.util.concurrent.ExecutionException if execution failed - * @throws InterruptedException if interrupt occurred while waiting - * @throws java.util.concurrent.TimeoutException if timeout occurred - */ - public void await(int secs) throws InterruptedException, - ExecutionException, TimeoutException { - for (CustomIOLoop l : iloops) { - if (l.worker.task != null) { - l.worker.task.get(secs, TimeUnit.SECONDS); - latencyTotal += l.latencyTotal; - latencyCount += l.latencyCount; - } - } - messages.freeze(); - bytes.freeze(); - } - - /** - * Reports on the accumulated throughput and latency. - */ - public void report() { - DecimalFormat f = new DecimalFormat("#,##0"); - out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency", - f.format(messages.total()), f.format(bytes.total()), - f.format(messages.throughput()), - f.format(bytes.throughput() / (1024 * msgLength)), - f.format(latencyTotal / latencyCount))); - } - - - // Loop for transfer of fixed-length messages - private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> { - - Worker worker = new Worker(); - long latencyTotal = 0; - long latencyCount = 0; - - - public CustomIOLoop() throws IOException { - super(500); - } - - - @Override - protected TestMessageStream createStream(ByteChannel channel) { - return new TestMessageStream(msgLength, channel, this); - } - - @Override - protected synchronized void removeStream(MessageStream<TestMessage> stream) { - super.removeStream(stream); - messages.add(stream.messagesIn().total()); - bytes.add(stream.bytesIn().total()); - stream.messagesOut().reset(); - stream.bytesOut().reset(); - } - - @Override - protected void processMessages(List<TestMessage> messages, - MessageStream<TestMessage> stream) { - for (TestMessage message : messages) { - // TODO: summarize latency data better - latencyTotal += nanoTime() - message.requestorTime(); - latencyCount++; - } - worker.release(messages.size()); - } - - @Override - protected void connect(SelectionKey key) throws IOException { - super.connect(key); - TestMessageStream b = (TestMessageStream) key.attachment(); - Worker w = ((CustomIOLoop) b.loop()).worker; - w.pump(b); - } - - } - - /** - * Auxiliary worker to connect and pump batched messages using blocking I/O. - */ - private class Worker implements Runnable { - - private static final int BATCH_SIZE = 50; - private static final int PERMITS = 2 * BATCH_SIZE; - - private TestMessageStream stream; - private FutureTask<Worker> task; - - // Stuff to throttle pump - private final Semaphore semaphore = new Semaphore(PERMITS); - private int msgWritten; - - void pump(TestMessageStream stream) { - this.stream = stream; - task = new FutureTask<>(this, this); - wpool.execute(task); - } - - @Override - public void run() { - try { - log.info("Worker started..."); - - while (msgWritten < msgCount) { - int size = Math.min(BATCH_SIZE, msgCount - msgWritten); - writeBatch(size); - msgWritten += size; - } - - // Now try to get all the permits back before sending poison pill - semaphore.acquireUninterruptibly(PERMITS); - stream.close(); - - log.info("Worker done..."); - - } catch (IOException e) { - log.error("Worker unable to perform I/O", e); - } - } - - - private void writeBatch(int size) throws IOException { - // Build a batch of messages - List<TestMessage> batch = Lists.newArrayListWithCapacity(size); - for (int i = 0; i < size; i++) { - batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding())); - } - acquire(size); - stream.write(batch); - } - - - // Release permits based on the specified number of message credits - private void release(int permits) { - semaphore.release(permits); - } - - // Acquire permit for a single batch - private void acquire(int permits) { - semaphore.acquireUninterruptibly(permits); - } - - } - -} |