diff options
Diffstat (limited to 'framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java')
-rw-r--r-- | framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java | 596 |
1 files changed, 596 insertions, 0 deletions
diff --git a/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java new file mode 100644 index 00000000..de9e9f21 --- /dev/null +++ b/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java @@ -0,0 +1,596 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.intentperf; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import org.apache.commons.lang.math.RandomUtils; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Modified; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.packet.MacAddress; +import org.onlab.util.Counter; +import org.onosproject.cfg.ComponentConfigService; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.mastership.MastershipService; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.Device; +import org.onosproject.net.PortNumber; +import org.onosproject.net.device.DeviceService; +import org.onosproject.net.flow.DefaultTrafficSelector; +import org.onosproject.net.flow.DefaultTrafficTreatment; +import org.onosproject.net.flow.TrafficSelector; +import org.onosproject.net.flow.TrafficTreatment; +import org.onosproject.net.intent.Intent; +import org.onosproject.net.intent.IntentEvent; +import org.onosproject.net.intent.IntentListener; +import org.onosproject.net.intent.IntentService; +import org.onosproject.net.intent.Key; +import org.onosproject.net.intent.PartitionService; +import org.onosproject.net.intent.PointToPointIntent; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterMessage; +import org.onosproject.store.cluster.messaging.ClusterMessageHandler; +import org.onosproject.store.cluster.messaging.MessageSubject; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Dictionary; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.lang.String.format; +import static java.lang.System.currentTimeMillis; +import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY; +import static org.onlab.util.Tools.*; +import static org.onosproject.net.intent.IntentEvent.Type.*; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Application to test sustained intent throughput. + */ +@Component(immediate = true) +@Service(value = IntentPerfInstaller.class) +public class IntentPerfInstaller { + + private final Logger log = getLogger(getClass()); + + private static final int DEFAULT_NUM_WORKERS = 1; + + private static final int DEFAULT_NUM_KEYS = 40000; + private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms + + private static final int DEFAULT_NUM_NEIGHBORS = 0; + + private static final int START_DELAY = 5_000; // ms + private static final int REPORT_PERIOD = 1_000; //ms + + private static final String START = "start"; + private static final String STOP = "stop"; + private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl"); + + //FIXME add path length + + @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS, + label = "Number of keys (i.e. unique intents) to generate per instance") + private int numKeys = DEFAULT_NUM_KEYS; + + //TODO implement numWorkers property +// @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS, +// label = "Number of installer threads per instance") +// private int numWokers = DEFAULT_NUM_WORKERS; + + @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD, + label = "Goal for cycle period (in ms)") + private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD; + + @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS, + label = "Number of neighbors to generate intents for") + private int numNeighbors = DEFAULT_NUM_NEIGHBORS; + + @Reference(cardinality = MANDATORY_UNARY) + protected CoreService coreService; + + @Reference(cardinality = MANDATORY_UNARY) + protected IntentService intentService; + + @Reference(cardinality = MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = MANDATORY_UNARY) + protected DeviceService deviceService; + + @Reference(cardinality = MANDATORY_UNARY) + protected MastershipService mastershipService; + + @Reference(cardinality = MANDATORY_UNARY) + protected PartitionService partitionService; + + @Reference(cardinality = MANDATORY_UNARY) + protected ComponentConfigService configService; + + @Reference(cardinality = MANDATORY_UNARY) + protected IntentPerfCollector sampleCollector; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService communicationService; + + private ExecutorService messageHandlingExecutor; + + private ExecutorService workers; + private ApplicationId appId; + private Listener listener; + private boolean stopped = true; + + private Timer reportTimer; + + // FIXME this variable isn't shared properly between multiple worker threads + private int lastKey = 0; + + private IntentPerfUi perfUi; + private NodeId nodeId; + private TimerTask reporterTask; + + @Activate + public void activate(ComponentContext context) { + configService.registerProperties(getClass()); + + nodeId = clusterService.getLocalNode().id(); + appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString()); + + // TODO: replace with shared timer + reportTimer = new Timer("onos-intent-perf-reporter"); + workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d")); + + // disable flow backups for testing + configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore", + "backupEnabled", "false"); + + // TODO: replace with shared executor + messageHandlingExecutor = Executors.newSingleThreadExecutor( + groupedThreads("onos/perf", "command-handler")); + + communicationService.addSubscriber(CONTROL, new InternalControl(), + messageHandlingExecutor); + + listener = new Listener(); + intentService.addListener(listener); + + // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation + modify(context); + } + + @Deactivate + public void deactivate() { + stopTestRun(); + + configService.unregisterProperties(getClass(), false); + messageHandlingExecutor.shutdown(); + communicationService.removeSubscriber(CONTROL); + + if (listener != null) { + reportTimer.cancel(); + intentService.removeListener(listener); + listener = null; + reportTimer = null; + } + } + + @Modified + public void modify(ComponentContext context) { + if (context == null) { + logConfig("Reconfigured"); + return; + } + + Dictionary<?, ?> properties = context.getProperties(); + int newNumKeys, newCyclePeriod, newNumNeighbors; + try { + String s = get(properties, "numKeys"); + newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim()); + + s = get(properties, "cyclePeriod"); + newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim()); + + s = get(properties, "numNeighbors"); + newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim()); + + } catch (NumberFormatException | ClassCastException e) { + log.warn("Malformed configuration detected; using defaults", e); + newNumKeys = DEFAULT_NUM_KEYS; + newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD; + newNumNeighbors = DEFAULT_NUM_NEIGHBORS; + } + + if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) { + numKeys = newNumKeys; + cyclePeriod = newCyclePeriod; + numNeighbors = newNumNeighbors; + logConfig("Reconfigured"); + } + } + + public void start() { + if (stopped) { + stopped = false; + communicationService.broadcast(START, CONTROL, str -> str.getBytes()); + startTestRun(); + } + } + + public void stop() { + if (!stopped) { + communicationService.broadcast(STOP, CONTROL, str -> str.getBytes()); + stopTestRun(); + } + } + + private void logConfig(String prefix) { + log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}", + prefix, appId.id(), numKeys, cyclePeriod, numNeighbors); + } + + private void startTestRun() { + sampleCollector.clearSamples(); + + // adjust numNeighbors and generate list of neighbors + numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors); + + // Schedule reporter task on report period boundary + reporterTask = new ReporterTask(); + reportTimer.scheduleAtFixedRate(reporterTask, + REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, + REPORT_PERIOD); + + // Submit workers + stopped = false; + for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) { + workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey))); + } + log.info("Started test run"); + } + + private void stopTestRun() { + if (reporterTask != null) { + reporterTask.cancel(); + reporterTask = null; + } + + try { + workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warn("Failed to stop worker", e); + } + + sampleCollector.recordSample(0, 0); + sampleCollector.recordSample(0, 0); + stopped = true; + + log.info("Stopped test run"); + } + + private List<NodeId> getNeighbors() { + List<NodeId> nodes = clusterService.getNodes().stream() + .map(ControllerNode::id) + .collect(Collectors.toCollection(ArrayList::new)); + // sort neighbors by id + Collections.sort(nodes, (node1, node2) -> + node1.toString().compareTo(node2.toString())); + // rotate the local node to index 0 + Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id())); + log.debug("neighbors (raw): {}", nodes); //TODO remove + // generate the sub-list that will contain local node and selected neighbors + nodes = nodes.subList(0, numNeighbors + 1); + log.debug("neighbors: {}", nodes); //TODO remove + return nodes; + } + + private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) { + // choose a random device for which this node is master + List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList()); + Device device = deviceList.get(RandomUtils.nextInt(deviceList.size())); + + //FIXME we currently ignore the path length and always use the same device + TrafficSelector selector = DefaultTrafficSelector.builder() + .matchEthDst(MacAddress.valueOf(mac)).build(); + TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment(); + ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1)); + ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2)); + + return PointToPointIntent.builder() + .appId(appId) + .key(key) + .selector(selector) + .treatment(treatment) + .ingressPoint(ingress) + .egressPoint(egress) + .build(); + } + + /** + * Creates a specified number of intents for testing purposes. + * + * @param numberOfKeys number of intents + * @param pathLength path depth + * @param firstKey first key to attempt + * @return set of intents + */ + private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) { + List<NodeId> neighbors = getNeighbors(); + + Multimap<NodeId, Device> devices = ArrayListMultimap.create(); + deviceService.getAvailableDevices() + .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device)); + + // ensure that we have at least one device per neighbor + neighbors.forEach(node -> checkState(devices.get(node).size() > 0, + "There are no devices for {}", node)); + + // TODO pull this outside so that createIntent can use it + // prefix based on node id for keys generated on this instance + long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32; + + int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size()); + Multimap<NodeId, Intent> intents = ArrayListMultimap.create(); + + for (int count = 0, k = firstKey; count < numberOfKeys; k++) { + Key key = Key.of(keyPrefix + k, appId); + + NodeId leader = partitionService.getLeader(key); + if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) { + // Bail if we are not sending to this node or we have enough for this node + continue; + } + intents.put(leader, createIntent(key, keyPrefix + k, leader, devices)); + + // Bump up the counter and remember this as the last key used. + count++; + lastKey = k; + if (count % 1000 == 0) { + log.info("Building intents... {} (attempt: {})", count, lastKey); + } + } + checkState(intents.values().size() == numberOfKeys, + "Generated wrong number of intents"); + log.info("Created {} intents", numberOfKeys); + intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size())); + + return Sets.newHashSet(intents.values()); + } + + // Submits intent operations. + final class Submitter implements Runnable { + + private long lastDuration; + private int lastCount; + + private Set<Intent> intents = Sets.newHashSet(); + private Set<Intent> submitted = Sets.newHashSet(); + private Set<Intent> withdrawn = Sets.newHashSet(); + + private Submitter(Set<Intent> intents) { + this.intents = intents; + lastCount = numKeys / 4; + lastDuration = 1_000; // 1 second + } + + @Override + public void run() { + prime(); + while (!stopped) { + try { + cycle(); + } catch (Exception e) { + log.warn("Exception during cycle", e); + } + } + clear(); + } + + private Iterable<Intent> subset(Set<Intent> intents) { + List<Intent> subset = Lists.newArrayList(intents); + Collections.shuffle(subset); + return subset.subList(0, lastCount); + } + + // Submits the specified intent. + private void submit(Intent intent) { + intentService.submit(intent); + submitted.add(intent); + withdrawn.remove(intent); //TODO could check result here... + } + + // Withdraws the specified intent. + private void withdraw(Intent intent) { + intentService.withdraw(intent); + withdrawn.add(intent); + submitted.remove(intent); //TODO could check result here... + } + + // Primes the cycle. + private void prime() { + int i = 0; + withdrawn.addAll(intents); + for (Intent intent : intents) { + submit(intent); + // only submit half of the intents to start + if (i++ >= intents.size() / 2) { + break; + } + } + } + + private void clear() { + submitted.forEach(this::withdraw); + } + + // Runs a single operation cycle. + private void cycle() { + //TODO consider running without rate adjustment + adjustRates(); + + long start = currentTimeMillis(); + subset(submitted).forEach(this::withdraw); + subset(withdrawn).forEach(this::submit); + long delta = currentTimeMillis() - start; + + if (delta > cyclePeriod * 3 || delta < 0) { + log.warn("Cycle took {} ms", delta); + } + + int difference = cyclePeriod - (int) delta; + if (difference > 0) { + delay(difference); + } + + lastDuration = delta; + } + + int cycleCount = 0; + + private void adjustRates() { + + int addDelta = Math.max(1000 - cycleCount, 10); + double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995); + + //FIXME need to iron out the rate adjustment + //FIXME we should taper the adjustments over time + //FIXME don't just use the lastDuration, take an average + if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec) + if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500 + lastDuration <= cyclePeriod) { + lastCount = Math.min(lastCount + addDelta, intents.size() / 2); + } else { + lastCount *= multRatio; + } + log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})", + lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput()); + } + + } + } + + // Event listener to monitor throughput. + final class Listener implements IntentListener { + + private final Counter runningTotal = new Counter(); + private volatile Map<IntentEvent.Type, Counter> counters; + + private volatile double processedThroughput = 0; + private volatile double requestThroughput = 0; + + public Listener() { + counters = initCounters(); + } + + private Map<IntentEvent.Type, Counter> initCounters() { + Map<IntentEvent.Type, Counter> map = Maps.newHashMap(); + for (IntentEvent.Type type : IntentEvent.Type.values()) { + map.put(type, new Counter()); + } + return map; + } + + public double processedThroughput() { + return processedThroughput; + } + + public double requestThroughput() { + return requestThroughput; + } + + @Override + public void event(IntentEvent event) { + if (event.subject().appId().equals(appId)) { + counters.get(event.type()).add(1); + } + } + + public void report() { + Map<IntentEvent.Type, Counter> reportCounters = counters; + counters = initCounters(); + + // update running total and latest throughput + Counter installed = reportCounters.get(INSTALLED); + Counter withdrawn = reportCounters.get(WITHDRAWN); + processedThroughput = installed.throughput() + withdrawn.throughput(); + runningTotal.add(installed.total() + withdrawn.total()); + + Counter installReq = reportCounters.get(INSTALL_REQ); + Counter withdrawReq = reportCounters.get(WITHDRAW_REQ); + requestThroughput = installReq.throughput() + withdrawReq.throughput(); + + // build the string to report + StringBuilder stringBuilder = new StringBuilder(); + for (IntentEvent.Type type : IntentEvent.Type.values()) { + Counter counter = reportCounters.get(type); + stringBuilder.append(format("%s=%.2f;", type, counter.throughput())); + } + log.info("Throughput: OVERALL={}; CURRENT={}; {}", + format("%.2f", runningTotal.throughput()), + format("%.2f", processedThroughput), + stringBuilder); + + sampleCollector.recordSample(runningTotal.throughput(), + processedThroughput); + } + } + + private class InternalControl implements ClusterMessageHandler { + @Override + public void handle(ClusterMessage message) { + String cmd = new String(message.payload()); + log.info("Received command {}", cmd); + if (cmd.equals(START)) { + startTestRun(); + } else { + stopTestRun(); + } + } + } + + private class ReporterTask extends TimerTask { + @Override + public void run() { + //adjustRates(); // FIXME we currently adjust rates in the cycle thread + listener.report(); + } + } + +} |