/* * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onosproject.net.statistic.impl; import com.google.common.base.MoreObjects; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableSet; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onosproject.cli.Comparators; import org.onosproject.net.ConnectPoint; import org.onosproject.net.Device; import org.onosproject.net.Port; import org.onosproject.net.PortNumber; import org.onosproject.net.device.DeviceService; import org.onosproject.net.flow.DefaultTypedFlowEntry; import org.onosproject.net.flow.FlowEntry; import org.onosproject.net.flow.FlowRule; import org.onosproject.net.flow.FlowRuleEvent; import org.onosproject.net.flow.FlowRuleListener; import org.onosproject.net.flow.FlowRuleService; import org.onosproject.net.flow.TypedStoredFlowEntry; import org.onosproject.net.flow.instructions.Instruction; import org.onosproject.net.statistic.DefaultLoad; import org.onosproject.net.statistic.FlowStatisticService; import org.onosproject.net.statistic.Load; import org.onosproject.net.statistic.FlowStatisticStore; import org.onosproject.net.statistic.SummaryFlowEntryWithLoad; import org.onosproject.net.statistic.TypedFlowEntryWithLoad; import org.slf4j.Logger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; import static org.onosproject.security.AppGuard.checkPermission; import static org.slf4j.LoggerFactory.getLogger; import static org.onosproject.security.AppPermission.Type.*; /** * Provides an implementation of the Flow Statistic Service. */ @Component(immediate = true, enabled = true) @Service public class FlowStatisticManager implements FlowStatisticService { private final Logger log = getLogger(getClass()); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected FlowRuleService flowRuleService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected FlowStatisticStore flowStatisticStore; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DeviceService deviceService; private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener(); @Activate public void activate() { flowRuleService.addListener(frListener); log.info("Started"); } @Deactivate public void deactivate() { flowRuleService.removeListener(frListener); log.info("Stopped"); } @Override public Map loadSummary(Device device) { checkPermission(STATISTIC_READ); Map summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR); if (device == null) { return summaryLoad; } List ports = new ArrayList<>(deviceService.getPorts(device.id())); for (Port port : ports) { ConnectPoint cp = new ConnectPoint(device.id(), port.number()); SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp); summaryLoad.put(cp, sfe); } return summaryLoad; } @Override public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) { checkPermission(STATISTIC_READ); ConnectPoint cp = new ConnectPoint(device.id(), pNumber); return loadSummaryPortInternal(cp); } @Override public Map> loadAllByType(Device device, TypedStoredFlowEntry.FlowLiveType liveType, Instruction.Type instType) { checkPermission(STATISTIC_READ); Map> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR); if (device == null) { return allLoad; } List ports = new ArrayList<>(deviceService.getPorts(device.id())); for (Port port : ports) { ConnectPoint cp = new ConnectPoint(device.id(), port.number()); List tfel = loadAllPortInternal(cp, liveType, instType); allLoad.put(cp, tfel); } return allLoad; } @Override public List loadAllByType(Device device, PortNumber pNumber, TypedStoredFlowEntry.FlowLiveType liveType, Instruction.Type instType) { checkPermission(STATISTIC_READ); ConnectPoint cp = new ConnectPoint(device.id(), pNumber); return loadAllPortInternal(cp, liveType, instType); } @Override public Map> loadTopnByType(Device device, TypedStoredFlowEntry.FlowLiveType liveType, Instruction.Type instType, int topn) { checkPermission(STATISTIC_READ); Map> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR); if (device == null) { return allLoad; } List ports = new ArrayList<>(deviceService.getPorts(device.id())); for (Port port : ports) { ConnectPoint cp = new ConnectPoint(device.id(), port.number()); List tfel = loadTopnPortInternal(cp, liveType, instType, topn); allLoad.put(cp, tfel); } return allLoad; } @Override public List loadTopnByType(Device device, PortNumber pNumber, TypedStoredFlowEntry.FlowLiveType liveType, Instruction.Type instType, int topn) { checkPermission(STATISTIC_READ); ConnectPoint cp = new ConnectPoint(device.id(), pNumber); return loadTopnPortInternal(cp, liveType, instType, topn); } private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) { checkPermission(STATISTIC_READ); Set currentStats; Set previousStats; TypedStatistics typedStatistics; synchronized (flowStatisticStore) { currentStats = flowStatisticStore.getCurrentFlowStatistic(cp); if (currentStats == null) { return new SummaryFlowEntryWithLoad(cp, new DefaultLoad()); } previousStats = flowStatisticStore.getPreviousFlowStatistic(cp); if (previousStats == null) { return new SummaryFlowEntryWithLoad(cp, new DefaultLoad()); } // copy to local flow entry typedStatistics = new TypedStatistics(currentStats, previousStats); // Check for validity of this stats data checkLoadValidity(currentStats, previousStats); } // current and previous set is not empty! Set currentSet = typedStatistics.current(); Set previousSet = typedStatistics.previous(); Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet), TypedFlowEntryWithLoad.avgPollInterval()); Map currentMap; Map previousMap; currentMap = typedStatistics.currentImmediate(); previousMap = typedStatistics.previousImmediate(); Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap), TypedFlowEntryWithLoad.shortPollInterval()); currentMap = typedStatistics.currentShort(); previousMap = typedStatistics.previousShort(); Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap), TypedFlowEntryWithLoad.shortPollInterval()); currentMap = typedStatistics.currentMid(); previousMap = typedStatistics.previousMid(); Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap), TypedFlowEntryWithLoad.midPollInterval()); currentMap = typedStatistics.currentLong(); previousMap = typedStatistics.previousLong(); Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap), TypedFlowEntryWithLoad.longPollInterval()); currentMap = typedStatistics.currentUnknown(); previousMap = typedStatistics.previousUnknown(); Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap), TypedFlowEntryWithLoad.avgPollInterval()); return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad); } private List loadAllPortInternal(ConnectPoint cp, TypedStoredFlowEntry.FlowLiveType liveType, Instruction.Type instType) { checkPermission(STATISTIC_READ); List retTfel = new ArrayList<>(); Set currentStats; Set previousStats; TypedStatistics typedStatistics; synchronized (flowStatisticStore) { currentStats = flowStatisticStore.getCurrentFlowStatistic(cp); if (currentStats == null) { return retTfel; } previousStats = flowStatisticStore.getPreviousFlowStatistic(cp); if (previousStats == null) { return retTfel; } // copy to local flow entry set typedStatistics = new TypedStatistics(currentStats, previousStats); // Check for validity of this stats data checkLoadValidity(currentStats, previousStats); } // current and previous set is not empty! boolean isAllLiveType = (liveType == null ? true : false); // null is all live type boolean isAllInstType = (instType == null ? true : false); // null is all inst type Map currentMap; Map previousMap; if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) { currentMap = typedStatistics.currentImmediate(); previousMap = typedStatistics.previousImmediate(); List fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap, isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval()); if (fel.size() > 0) { retTfel.addAll(fel); } } if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) { currentMap = typedStatistics.currentShort(); previousMap = typedStatistics.previousShort(); List fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap, isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval()); if (fel.size() > 0) { retTfel.addAll(fel); } } if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) { currentMap = typedStatistics.currentMid(); previousMap = typedStatistics.previousMid(); List fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap, isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval()); if (fel.size() > 0) { retTfel.addAll(fel); } } if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) { currentMap = typedStatistics.currentLong(); previousMap = typedStatistics.previousLong(); List fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap, isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval()); if (fel.size() > 0) { retTfel.addAll(fel); } } if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) { currentMap = typedStatistics.currentUnknown(); previousMap = typedStatistics.previousUnknown(); List fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap, isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval()); if (fel.size() > 0) { retTfel.addAll(fel); } } return retTfel; } private List typedFlowEntryLoadByInstInternal(ConnectPoint cp, Map currentMap, Map previousMap, boolean isAllInstType, Instruction.Type instType, int liveTypePollInterval) { List fel = new ArrayList<>(); for (TypedStoredFlowEntry tfe : currentMap.values()) { if (isAllInstType || tfe.treatment().allInstructions().stream(). filter(i -> i.type() == instType). findAny().isPresent()) { long currentBytes = tfe.bytes(); long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes(); Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval); fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad)); } } return fel; } private List loadTopnPortInternal(ConnectPoint cp, TypedStoredFlowEntry.FlowLiveType liveType, Instruction.Type instType, int topn) { List fel = loadAllPortInternal(cp, liveType, instType); // Sort with descending order of load List tfel = fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR). limit(topn).collect(Collectors.toList()); return tfel; } private long aggregateBytesSet(Set setFE) { return setFE.stream().mapToLong(FlowEntry::bytes).sum(); } private long aggregateBytesMap(Map mapFE) { return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum(); } /** * Internal data class holding two set of typed flow entries. */ private static class TypedStatistics { private final ImmutableSet currentAll; private final ImmutableSet previousAll; private final Map currentImmediate = new HashMap<>(); private final Map previousImmediate = new HashMap<>(); private final Map currentShort = new HashMap<>(); private final Map previousShort = new HashMap<>(); private final Map currentMid = new HashMap<>(); private final Map previousMid = new HashMap<>(); private final Map currentLong = new HashMap<>(); private final Map previousLong = new HashMap<>(); private final Map currentUnknown = new HashMap<>(); private final Map previousUnknown = new HashMap<>(); public TypedStatistics(Set current, Set previous) { this.currentAll = ImmutableSet.copyOf(checkNotNull(current)); this.previousAll = ImmutableSet.copyOf(checkNotNull(previous)); currentAll.forEach(fe -> { TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe); switch (tfe.flowLiveType()) { case IMMEDIATE_FLOW: currentImmediate.put(fe, tfe); break; case SHORT_FLOW: currentShort.put(fe, tfe); break; case MID_FLOW: currentMid.put(fe, tfe); break; case LONG_FLOW: currentLong.put(fe, tfe); break; default: currentUnknown.put(fe, tfe); break; } }); previousAll.forEach(fe -> { TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe); switch (tfe.flowLiveType()) { case IMMEDIATE_FLOW: if (currentImmediate.containsKey(fe)) { previousImmediate.put(fe, tfe); } else if (currentShort.containsKey(fe)) { previousShort.put(fe, tfe); } else if (currentMid.containsKey(fe)) { previousMid.put(fe, tfe); } else if (currentLong.containsKey(fe)) { previousLong.put(fe, tfe); } else { previousUnknown.put(fe, tfe); } break; case SHORT_FLOW: if (currentShort.containsKey(fe)) { previousShort.put(fe, tfe); } else if (currentMid.containsKey(fe)) { previousMid.put(fe, tfe); } else if (currentLong.containsKey(fe)) { previousLong.put(fe, tfe); } else { previousUnknown.put(fe, tfe); } break; case MID_FLOW: if (currentMid.containsKey(fe)) { previousMid.put(fe, tfe); } else if (currentLong.containsKey(fe)) { previousLong.put(fe, tfe); } else { previousUnknown.put(fe, tfe); } break; case LONG_FLOW: if (currentLong.containsKey(fe)) { previousLong.put(fe, tfe); } else { previousUnknown.put(fe, tfe); } break; default: previousUnknown.put(fe, tfe); break; } }); } /** * Returns flow entries as the current value. * * @return flow entries as the current value */ public ImmutableSet current() { return currentAll; } /** * Returns flow entries as the previous value. * * @return flow entries as the previous value */ public ImmutableSet previous() { return previousAll; } public Map currentImmediate() { return currentImmediate; } public Map previousImmediate() { return previousImmediate; } public Map currentShort() { return currentShort; } public Map previousShort() { return previousShort; } public Map currentMid() { return currentMid; } public Map previousMid() { return previousMid; } public Map currentLong() { return currentLong; } public Map previousLong() { return previousLong; } public Map currentUnknown() { return currentUnknown; } public Map previousUnknown() { return previousUnknown; } /** * Validates values are not empty. * * @return false if either of the sets is empty. Otherwise, true. */ public boolean isValid() { return !(currentAll.isEmpty() || previousAll.isEmpty()); } @Override public int hashCode() { return Objects.hash(currentAll, previousAll); } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (!(obj instanceof TypedStatistics)) { return false; } final TypedStatistics other = (TypedStatistics) obj; return Objects.equals(this.currentAll, other.currentAll) && Objects.equals(this.previousAll, other.previousAll); } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("current", currentAll) .add("previous", previousAll) .toString(); } } private void checkLoadValidity(Set current, Set previous) { current.stream().forEach(c -> { FlowEntry f = previous.stream().filter(p -> c.equals(p)). findAny().orElse(null); if (f != null && c.bytes() < f.bytes()) { log.debug("FlowStatisticManager:checkLoadValidity():" + "Error: " + c + " :Previous bytes=" + f.bytes() + " is larger than current bytes=" + c.bytes() + " !!!"); } }); } /** * Creates a predicate that checks the instruction type of a flow entry is the same as * the specified instruction type. * * @param instType instruction type to be checked * @return predicate */ private static Predicate hasInstructionType(Instruction.Type instType) { return new Predicate() { @Override public boolean apply(FlowEntry flowEntry) { List allInstructions = flowEntry.treatment().allInstructions(); return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent(); } }; } /** * Internal flow rule event listener for FlowStatisticManager. */ private class InternalFlowRuleStatsListener implements FlowRuleListener { @Override public void event(FlowRuleEvent event) { FlowRule rule = event.subject(); switch (event.type()) { case RULE_ADDED: if (rule instanceof FlowEntry) { flowStatisticStore.addFlowStatistic((FlowEntry) rule); } break; case RULE_UPDATED: flowStatisticStore.updateFlowStatistic((FlowEntry) rule); break; case RULE_ADD_REQUESTED: break; case RULE_REMOVE_REQUESTED: break; case RULE_REMOVED: flowStatisticStore.removeFlowStatistic(rule); break; default: log.warn("Unknown flow rule event {}", event); } } } }