aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java')
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java324
1 files changed, 0 insertions, 324 deletions
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
deleted file mode 100644
index acc9a08f..00000000
--- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import com.google.common.collect.Lists;
-import org.onlab.util.Counter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.lang.String.format;
-import static java.lang.System.nanoTime;
-import static java.lang.System.out;
-import static org.onlab.nio.IOLoopTestServer.PORT;
-import static org.onlab.util.Tools.delay;
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Auxiliary test fixture to measure speed of NIO-based channels.
- */
-public class IOLoopTestClient {
-
- private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
-
- private final InetAddress ip;
- private final int port;
- private final int msgCount;
- private final int msgLength;
-
- private final List<CustomIOLoop> iloops = new ArrayList<>();
- private final ExecutorService ipool;
- private final ExecutorService wpool;
-
- Counter messages;
- Counter bytes;
- long latencyTotal = 0;
- long latencyCount = 0;
-
-
- /**
- * Main entry point to launch the client.
- *
- * @param args command-line arguments
- * @throws java.io.IOException if unable to connect to server
- * @throws InterruptedException if latch wait gets interrupted
- * @throws java.util.concurrent.ExecutionException if wait gets interrupted
- * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
- */
- public static void main(String[] args)
- throws IOException, InterruptedException, ExecutionException, TimeoutException {
- startStandalone(args);
-
- System.exit(0);
- }
-
- /**
- * Starts a standalone IO loop test client.
- *
- * @param args command-line arguments
- */
- public static void startStandalone(String[] args)
- throws IOException, InterruptedException, ExecutionException, TimeoutException {
- InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
- int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
- int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
- int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
- int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
-
- log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
- wc, mc, ml, ip);
- IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
-
- client.start();
- delay(500);
-
- client.await(to);
- client.report();
- }
-
- /**
- * Creates a speed client.
- *
- * @param ip ip address of server
- * @param wc worker count
- * @param mc message count to send per client
- * @param ml message length in bytes
- * @param port socket port
- * @throws java.io.IOException if unable to create IO loops
- */
- public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
- this.ip = ip;
- this.port = port;
- this.msgCount = mc;
- this.msgLength = ml;
- this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
- this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
-
- for (int i = 0; i < wc; i++) {
- iloops.add(new CustomIOLoop());
- }
- }
-
- /**
- * Starts the client workers.
- *
- * @throws java.io.IOException if unable to open connection
- */
- public void start() throws IOException {
- messages = new Counter();
- bytes = new Counter();
-
- // First start up all the IO loops
- for (CustomIOLoop l : iloops) {
- ipool.execute(l);
- }
-
- // Wait for all of them to get going
- for (CustomIOLoop l : iloops) {
- l.awaitStart(1000);
- }
-
- // ... and Next open all connections; one-per-loop
- for (CustomIOLoop l : iloops) {
- openConnection(l);
- }
- }
-
-
- /**
- * Initiates open connection request and registers the pending socket
- * channel with the given IO loop.
- *
- * @param loop loop with which the channel should be registered
- * @throws java.io.IOException if the socket could not be open or connected
- */
- private void openConnection(CustomIOLoop loop) throws IOException {
- SocketAddress sa = new InetSocketAddress(ip, port);
- SocketChannel ch = SocketChannel.open();
- ch.configureBlocking(false);
- loop.connectStream(ch);
- ch.connect(sa);
- }
-
-
- /**
- * Waits for the client workers to complete.
- *
- * @param secs timeout in seconds
- * @throws java.util.concurrent.ExecutionException if execution failed
- * @throws InterruptedException if interrupt occurred while waiting
- * @throws java.util.concurrent.TimeoutException if timeout occurred
- */
- public void await(int secs) throws InterruptedException,
- ExecutionException, TimeoutException {
- for (CustomIOLoop l : iloops) {
- if (l.worker.task != null) {
- l.worker.task.get(secs, TimeUnit.SECONDS);
- latencyTotal += l.latencyTotal;
- latencyCount += l.latencyCount;
- }
- }
- messages.freeze();
- bytes.freeze();
- }
-
- /**
- * Reports on the accumulated throughput and latency.
- */
- public void report() {
- DecimalFormat f = new DecimalFormat("#,##0");
- out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
- f.format(messages.total()), f.format(bytes.total()),
- f.format(messages.throughput()),
- f.format(bytes.throughput() / (1024 * msgLength)),
- f.format(latencyTotal / latencyCount)));
- }
-
-
- // Loop for transfer of fixed-length messages
- private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
-
- Worker worker = new Worker();
- long latencyTotal = 0;
- long latencyCount = 0;
-
-
- public CustomIOLoop() throws IOException {
- super(500);
- }
-
-
- @Override
- protected TestMessageStream createStream(ByteChannel channel) {
- return new TestMessageStream(msgLength, channel, this);
- }
-
- @Override
- protected synchronized void removeStream(MessageStream<TestMessage> stream) {
- super.removeStream(stream);
- messages.add(stream.messagesIn().total());
- bytes.add(stream.bytesIn().total());
- stream.messagesOut().reset();
- stream.bytesOut().reset();
- }
-
- @Override
- protected void processMessages(List<TestMessage> messages,
- MessageStream<TestMessage> stream) {
- for (TestMessage message : messages) {
- // TODO: summarize latency data better
- latencyTotal += nanoTime() - message.requestorTime();
- latencyCount++;
- }
- worker.release(messages.size());
- }
-
- @Override
- protected void connect(SelectionKey key) throws IOException {
- super.connect(key);
- TestMessageStream b = (TestMessageStream) key.attachment();
- Worker w = ((CustomIOLoop) b.loop()).worker;
- w.pump(b);
- }
-
- }
-
- /**
- * Auxiliary worker to connect and pump batched messages using blocking I/O.
- */
- private class Worker implements Runnable {
-
- private static final int BATCH_SIZE = 50;
- private static final int PERMITS = 2 * BATCH_SIZE;
-
- private TestMessageStream stream;
- private FutureTask<Worker> task;
-
- // Stuff to throttle pump
- private final Semaphore semaphore = new Semaphore(PERMITS);
- private int msgWritten;
-
- void pump(TestMessageStream stream) {
- this.stream = stream;
- task = new FutureTask<>(this, this);
- wpool.execute(task);
- }
-
- @Override
- public void run() {
- try {
- log.info("Worker started...");
-
- while (msgWritten < msgCount) {
- int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
- writeBatch(size);
- msgWritten += size;
- }
-
- // Now try to get all the permits back before sending poison pill
- semaphore.acquireUninterruptibly(PERMITS);
- stream.close();
-
- log.info("Worker done...");
-
- } catch (IOException e) {
- log.error("Worker unable to perform I/O", e);
- }
- }
-
-
- private void writeBatch(int size) throws IOException {
- // Build a batch of messages
- List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
- for (int i = 0; i < size; i++) {
- batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
- }
- acquire(size);
- stream.write(batch);
- }
-
-
- // Release permits based on the specified number of message credits
- private void release(int permits) {
- semaphore.release(permits);
- }
-
- // Acquire permit for a single batch
- private void acquire(int permits) {
- semaphore.acquireUninterruptibly(permits);
- }
-
- }
-
-}