diff options
Diffstat (limited to 'framework/src/onos/core/net/src/main/java/org/onosproject/net/statistic/impl/StatisticManager.java')
-rw-r--r-- | framework/src/onos/core/net/src/main/java/org/onosproject/net/statistic/impl/StatisticManager.java | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/net/statistic/impl/StatisticManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/net/statistic/impl/StatisticManager.java new file mode 100644 index 00000000..996ad14e --- /dev/null +++ b/framework/src/onos/core/net/src/main/java/org/onosproject/net/statistic/impl/StatisticManager.java @@ -0,0 +1,379 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.net.statistic.impl; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.GroupId; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.Link; +import org.onosproject.net.Path; + +import org.onosproject.net.flow.FlowEntry; +import org.onosproject.net.flow.FlowRule; +import org.onosproject.net.flow.FlowRuleEvent; +import org.onosproject.net.flow.FlowRuleListener; +import org.onosproject.net.flow.FlowRuleService; +import org.onosproject.net.statistic.DefaultLoad; +import org.onosproject.net.statistic.Load; +import org.onosproject.net.statistic.StatisticService; +import org.onosproject.net.statistic.StatisticStore; +import org.slf4j.Logger; + +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.slf4j.LoggerFactory.getLogger; +import static org.onosproject.security.AppGuard.checkPermission; +import static org.onosproject.security.AppPermission.Type.*; + + +/** + * Provides an implementation of the Statistic Service. + */ +@Component(immediate = true) +@Service +public class StatisticManager implements StatisticService { + + private final Logger log = getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected FlowRuleService flowRuleService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StatisticStore statisticStore; + + + private final InternalFlowRuleListener listener = new InternalFlowRuleListener(); + + @Activate + public void activate() { + flowRuleService.addListener(listener); + log.info("Started"); + + } + + @Deactivate + public void deactivate() { + flowRuleService.removeListener(listener); + log.info("Stopped"); + } + + @Override + public Load load(Link link) { + checkPermission(STATISTIC_READ); + + return load(link.src()); + } + + @Override + public Load load(Link link, ApplicationId appId, Optional<GroupId> groupId) { + checkPermission(STATISTIC_READ); + + Statistics stats = getStatistics(link.src()); + if (!stats.isValid()) { + return new DefaultLoad(); + } + + ImmutableSet<FlowEntry> current = FluentIterable.from(stats.current()) + .filter(hasApplicationId(appId)) + .filter(hasGroupId(groupId)) + .toSet(); + ImmutableSet<FlowEntry> previous = FluentIterable.from(stats.previous()) + .filter(hasApplicationId(appId)) + .filter(hasGroupId(groupId)) + .toSet(); + + return new DefaultLoad(aggregate(current), aggregate(previous)); + } + + @Override + public Load load(ConnectPoint connectPoint) { + checkPermission(STATISTIC_READ); + + return loadInternal(connectPoint); + } + + @Override + public Link max(Path path) { + checkPermission(STATISTIC_READ); + + if (path.links().isEmpty()) { + return null; + } + Load maxLoad = new DefaultLoad(); + Link maxLink = null; + for (Link link : path.links()) { + Load load = loadInternal(link.src()); + if (load.rate() > maxLoad.rate()) { + maxLoad = load; + maxLink = link; + } + } + return maxLink; + } + + @Override + public Link min(Path path) { + checkPermission(STATISTIC_READ); + + if (path.links().isEmpty()) { + return null; + } + Load minLoad = new DefaultLoad(); + Link minLink = null; + for (Link link : path.links()) { + Load load = loadInternal(link.src()); + if (load.rate() < minLoad.rate()) { + minLoad = load; + minLink = link; + } + } + return minLink; + } + + @Override + public FlowRule highestHitter(ConnectPoint connectPoint) { + checkPermission(STATISTIC_READ); + + Set<FlowEntry> hitters = statisticStore.getCurrentStatistic(connectPoint); + if (hitters.isEmpty()) { + return null; + } + + FlowEntry max = hitters.iterator().next(); + for (FlowEntry entry : hitters) { + if (entry.bytes() > max.bytes()) { + max = entry; + } + } + return max; + } + + private Load loadInternal(ConnectPoint connectPoint) { + Statistics stats = getStatistics(connectPoint); + if (!stats.isValid()) { + return new DefaultLoad(); + } + + return new DefaultLoad(aggregate(stats.current), aggregate(stats.previous)); + } + + /** + * Returns statistics of the specified port. + * + * @param connectPoint port to query + * @return statistics + */ + private Statistics getStatistics(ConnectPoint connectPoint) { + Set<FlowEntry> current; + Set<FlowEntry> previous; + synchronized (statisticStore) { + current = getCurrentStatistic(connectPoint); + previous = getPreviousStatistic(connectPoint); + } + + return new Statistics(current, previous); + } + + /** + * Returns the current statistic of the specified port. + + * @param connectPoint port to query + * @return set of flow entries + */ + private Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) { + Set<FlowEntry> stats = statisticStore.getCurrentStatistic(connectPoint); + if (stats == null) { + return Collections.emptySet(); + } else { + return stats; + } + } + + /** + * Returns the previous statistic of the specified port. + * + * @param connectPoint port to query + * @return set of flow entries + */ + private Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) { + Set<FlowEntry> stats = statisticStore.getPreviousStatistic(connectPoint); + if (stats == null) { + return Collections.emptySet(); + } else { + return stats; + } + } + + // TODO: make aggregation function generic by passing a function + // (applying Java 8 Stream API?) + /** + * Aggregates a set of values. + * @param values the values to aggregate + * @return a long value + */ + private long aggregate(Set<FlowEntry> values) { + long sum = 0; + for (FlowEntry f : values) { + sum += f.bytes(); + } + return sum; + } + + /** + * Internal flow rule event listener. + */ + private class InternalFlowRuleListener implements FlowRuleListener { + + @Override + public void event(FlowRuleEvent event) { + FlowRule rule = event.subject(); + switch (event.type()) { + case RULE_ADDED: + case RULE_UPDATED: + if (rule instanceof FlowEntry) { + statisticStore.addOrUpdateStatistic((FlowEntry) rule); + } + break; + case RULE_ADD_REQUESTED: + statisticStore.prepareForStatistics(rule); + break; + case RULE_REMOVE_REQUESTED: + statisticStore.removeFromStatistics(rule); + break; + case RULE_REMOVED: + break; + default: + log.warn("Unknown flow rule event {}", event); + } + } + } + + /** + * Internal data class holding two set of flow entries. + */ + private static class Statistics { + private final ImmutableSet<FlowEntry> current; + private final ImmutableSet<FlowEntry> previous; + + public Statistics(Set<FlowEntry> current, Set<FlowEntry> previous) { + this.current = ImmutableSet.copyOf(checkNotNull(current)); + this.previous = ImmutableSet.copyOf(checkNotNull(previous)); + } + + /** + * Returns flow entries as the current value. + * + * @return flow entries as the current value + */ + public ImmutableSet<FlowEntry> current() { + return current; + } + + /** + * Returns flow entries as the previous value. + * + * @return flow entries as the previous value + */ + public ImmutableSet<FlowEntry> previous() { + return previous; + } + + /** + * Validates values are not empty. + * + * @return false if either of the sets is empty. Otherwise, true. + */ + public boolean isValid() { + return !(current.isEmpty() || previous.isEmpty()); + } + + @Override + public int hashCode() { + return Objects.hash(current, previous); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof Statistics)) { + return false; + } + final Statistics other = (Statistics) obj; + return Objects.equals(this.current, other.current) && Objects.equals(this.previous, other.previous); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("current", current) + .add("previous", previous) + .toString(); + } + } + + /** + * Creates a predicate that checks the application ID of a flow entry is the same as + * the specified application ID. + * + * @param appId application ID to be checked + * @return predicate + */ + private static Predicate<FlowEntry> hasApplicationId(ApplicationId appId) { + return new Predicate<FlowEntry>() { + @Override + public boolean apply(FlowEntry flowEntry) { + return flowEntry.appId() == appId.id(); + } + }; + } + + /** + * Create a predicate that checks the group ID of a flow entry is the same as + * the specified group ID. + * + * @param groupId group ID to be checked + * @return predicate + */ + private static Predicate<FlowEntry> hasGroupId(Optional<GroupId> groupId) { + return new Predicate<FlowEntry>() { + @Override + public boolean apply(FlowEntry flowEntry) { + if (!groupId.isPresent()) { + return false; + } + // FIXME: The left hand type and right hand type don't match + // FlowEntry.groupId() still returns a short value, not int. + return flowEntry.groupId().equals(groupId.get()); + } + }; + } +} |