diff options
author | Ashlee Young <ashlee@wildernessvoice.com> | 2015-12-08 11:32:02 -0800 |
---|---|---|
committer | Ashlee Young <ashlee@wildernessvoice.com> | 2015-12-08 11:32:02 -0800 |
commit | f9da5db0ff14879e47a6d83244ba63d45b4d432d (patch) | |
tree | 30ae7124d2650f556252245ffd3c68ae1b7d845f /framework/src/onos/providers/openflow | |
parent | 76dc892491948adae5e5e62cf94448967e8d865b (diff) |
onos-1.4.0-rc1 tag
Change-Id: I7f6b85bbf2574b9e174ad919ff36acde28ca50a1
Signed-off-by: Ashlee Young <ashlee@wildernessvoice.com>
Diffstat (limited to 'framework/src/onos/providers/openflow')
9 files changed, 890 insertions, 890 deletions
diff --git a/framework/src/onos/providers/openflow/app/pom.xml b/framework/src/onos/providers/openflow/app/pom.xml index ac5f6eee..37564272 100644 --- a/framework/src/onos/providers/openflow/app/pom.xml +++ b/framework/src/onos/providers/openflow/app/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.onosproject</groupId> <artifactId>onos-of-providers</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.0-rc1</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/framework/src/onos/providers/openflow/base/pom.xml b/framework/src/onos/providers/openflow/base/pom.xml index 9eb170c3..231a17a4 100644 --- a/framework/src/onos/providers/openflow/base/pom.xml +++ b/framework/src/onos/providers/openflow/base/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.onosproject</groupId> <artifactId>onos-of-providers</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.0-rc1</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/framework/src/onos/providers/openflow/device/pom.xml b/framework/src/onos/providers/openflow/device/pom.xml index 0408f73e..e1c42a5d 100644 --- a/framework/src/onos/providers/openflow/device/pom.xml +++ b/framework/src/onos/providers/openflow/device/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.onosproject</groupId> <artifactId>onos-of-providers</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.0-rc1</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/framework/src/onos/providers/openflow/flow/pom.xml b/framework/src/onos/providers/openflow/flow/pom.xml index 1f65656e..16d7d32d 100644 --- a/framework/src/onos/providers/openflow/flow/pom.xml +++ b/framework/src/onos/providers/openflow/flow/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.onosproject</groupId> <artifactId>onos-of-providers</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.0-rc1</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java b/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java index aa8df947..d5186fa9 100644 --- a/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java +++ b/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java @@ -1,882 +1,882 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.provider.of.flow.impl; - -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.onosproject.net.flow.DefaultTypedFlowEntry; -import org.onosproject.net.flow.FlowEntry; -import org.onosproject.net.flow.FlowId; -import org.onosproject.net.flow.FlowRule; -import org.onosproject.net.flow.StoredFlowEntry; -import org.onosproject.net.flow.TypedStoredFlowEntry; -import org.onosproject.net.flow.instructions.Instruction; -import org.onosproject.net.flow.instructions.Instructions; -import org.onosproject.openflow.controller.OpenFlowSwitch; -import org.onosproject.openflow.controller.RoleState; -import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest; -import org.projectfloodlight.openflow.protocol.match.Match; -import org.projectfloodlight.openflow.types.OFPort; -import org.projectfloodlight.openflow.types.TableId; -import org.slf4j.Logger; - -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.onlab.util.Tools.groupedThreads; -import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType; -import static org.slf4j.LoggerFactory.getLogger; - -/** - * Efficiently and adaptively collects flow statistics for the specified switch. - */ -public class NewAdaptiveFlowStatsCollector { - - private final Logger log = getLogger(getClass()); - - private final OpenFlowSwitch sw; - - private ScheduledExecutorService adaptiveFlowStatsScheduler = - Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d")); - private ScheduledFuture<?> calAndShortFlowsThread; - private ScheduledFuture<?> midFlowsThread; - private ScheduledFuture<?> longFlowsThread; - - // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval - private CalAndShortFlowsTask calAndShortFlowsTask; - // Task that collects stats MID flows every 2*calAndPollInterval - private MidFlowsTask midFlowsTask; - // Task that collects stats LONG flows every 3*calAndPollInterval - private LongFlowsTask longFlowsTask; - - private static final int CAL_AND_POLL_TIMES = 1; // must be always 0 - private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1 - private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES - //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable - // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES - private static final int ENTIRE_POLL_TIMES = 6; - - private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5; - private static final int MIN_CAL_AND_POLL_FREQUENCY = 2; - private static final int MAX_CAL_AND_POLL_FREQUENCY = 60; - - private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; - private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; - private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; - // only used for checking condition at each task if it collects entire flows from a given switch or not - private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; - - // Number of call count of each Task, - // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask - private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called - private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called - private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called - - private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable(); - - private boolean isFirstTimeStart = true; - - public static final long NO_FLOW_MISSING_XID = (-1); - private long flowMissingXid = NO_FLOW_MISSING_XID; - - /** - * Creates a new adaptive collector for the given switch and default cal_and_poll frequency. - * - * @param sw switch to pull - * @param pollInterval cal and immediate poll frequency in seconds - */ - NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) { - this.sw = sw; - - initMemberVars(pollInterval); - } - - // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count - private void initMemberVars(int pollInterval) { - if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) { - this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY; - } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) { - this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY; - } else { - this.calAndPollInterval = pollInterval; - } - - calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval; - midPollInterval = MID_POLL_TIMES * calAndPollInterval; - longPollInterval = LONG_POLL_TIMES * calAndPollInterval; - entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval; - - callCountCalAndShortFlowsTask = 0; - callCountMidFlowsTask = 0; - callCountLongFlowsTask = 0; - - flowMissingXid = NO_FLOW_MISSING_XID; - } - - /** - * Adjusts adaptive poll frequency. - * - * @param pollInterval poll frequency in seconds - */ - synchronized void adjustCalAndPollInterval(int pollInterval) { - initMemberVars(pollInterval); - - if (calAndShortFlowsThread != null) { - calAndShortFlowsThread.cancel(false); - } - if (midFlowsThread != null) { - midFlowsThread.cancel(false); - } - if (longFlowsThread != null) { - longFlowsThread.cancel(false); - } - - calAndShortFlowsTask = new CalAndShortFlowsTask(); - calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( - calAndShortFlowsTask, - 0, - calAndPollInterval, - TimeUnit.SECONDS); - - midFlowsTask = new MidFlowsTask(); - midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( - midFlowsTask, - 0, - midPollInterval, - TimeUnit.SECONDS); - - longFlowsTask = new LongFlowsTask(); - longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( - longFlowsTask, - 0, - longPollInterval, - TimeUnit.SECONDS); - - log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted"); - } - - private class CalAndShortFlowsTask implements Runnable { - @Override - public void run() { - if (sw.getRole() == RoleState.MASTER) { - log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); - - if (isFirstTimeStart) { - // isFirstTimeStart, get entire flow stats from a given switch sw - log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}", - sw.getStringId()); - ofFlowStatsRequestAllSend(); - - callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES; - isFirstTimeStart = false; - } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) { - // entire_poll_times, get entire flow stats from a given switch sw - log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId()); - ofFlowStatsRequestAllSend(); - - callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES; - //TODO: check flows deleted in switch, but exist in controller flow table, then remove them - // - } else { - calAndShortFlowsTaskInternal(); - callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES; - } - } - } - } - - // send openflow flow stats request message with getting all flow entries to a given switch sw - private void ofFlowStatsRequestAllSend() { - OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest() - .setMatch(sw.factory().matchWildcardAll()) - .setTableId(TableId.ALL) - .setOutPort(OFPort.NO_MASK) - .build(); - - synchronized (this) { - // set the request xid to check the reply in OpenFlowRuleProvider - // After processing the reply of this request message, - // this must be set to NO_FLOW_MISSING_XID(-1) by provider - setFlowMissingXid(request.getXid()); - log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId()); - - sw.sendMsg(request); - } - } - - // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw - private void ofFlowStatsRequestFlowSend(FlowEntry fe) { - // set find match - Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(), - Optional.empty()).buildMatch(); - // set find tableId - TableId tableId = TableId.of(fe.tableId()); - // set output port - Instruction ins = fe.treatment().allInstructions().stream() - .filter(i -> (i.type() == Instruction.Type.OUTPUT)) - .findFirst() - .orElse(null); - OFPort ofPort = OFPort.NO_MASK; - if (ins != null) { - Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins; - ofPort = OFPort.of((int) ((out.port().toLong()))); - } - - OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest() - .setMatch(match) - .setTableId(tableId) - .setOutPort(ofPort) - .build(); - - synchronized (this) { - if (getFlowMissingXid() != NO_FLOW_MISSING_XID) { - log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet," - + " set no flow missing xid anyway, for {}", - sw.getStringId()); - setFlowMissingXid(NO_FLOW_MISSING_XID); - } - - sw.sendMsg(request); - } - } - - private void calAndShortFlowsTaskInternal() { - deviceFlowTable.checkAndMoveLiveFlowAll(); - - deviceFlowTable.getShortFlows().forEach(fe -> { - ofFlowStatsRequestFlowSend(fe); - }); - } - - private class MidFlowsTask implements Runnable { - @Override - public void run() { - if (sw.getRole() == RoleState.MASTER) { - log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); - - // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw - if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) { - callCountMidFlowsTask = MID_POLL_TIMES; - } else { - midFlowsTaskInternal(); - callCountMidFlowsTask += MID_POLL_TIMES; - } - } - } - } - - private void midFlowsTaskInternal() { - deviceFlowTable.getMidFlows().forEach(fe -> { - ofFlowStatsRequestFlowSend(fe); - }); - } - - private class LongFlowsTask implements Runnable { - @Override - public void run() { - if (sw.getRole() == RoleState.MASTER) { - log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); - - // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw - if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) { - callCountLongFlowsTask = LONG_POLL_TIMES; - } else { - longFlowsTaskInternal(); - callCountLongFlowsTask += LONG_POLL_TIMES; - } - } - } - } - - private void longFlowsTaskInternal() { - deviceFlowTable.getLongFlows().forEach(fe -> { - ofFlowStatsRequestFlowSend(fe); - }); - } - - /** - * start adaptive flow statistic collection. - * - */ - public synchronized void start() { - log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId()); - callCountCalAndShortFlowsTask = 0; - callCountMidFlowsTask = 0; - callCountLongFlowsTask = 0; - - isFirstTimeStart = true; - - // Initially start polling quickly. Then drop down to configured value - calAndShortFlowsTask = new CalAndShortFlowsTask(); - calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( - calAndShortFlowsTask, - 1, - calAndPollInterval, - TimeUnit.SECONDS); - - midFlowsTask = new MidFlowsTask(); - midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( - midFlowsTask, - 1, - midPollInterval, - TimeUnit.SECONDS); - - longFlowsTask = new LongFlowsTask(); - longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( - longFlowsTask, - 1, - longPollInterval, - TimeUnit.SECONDS); - - log.info("Started"); - } - - /** - * stop adaptive flow statistic collection. - * - */ - public synchronized void stop() { - log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId()); - if (calAndShortFlowsThread != null) { - calAndShortFlowsThread.cancel(true); - } - if (midFlowsThread != null) { - midFlowsThread.cancel(true); - } - if (longFlowsThread != null) { - longFlowsThread.cancel(true); - } - - adaptiveFlowStatsScheduler.shutdownNow(); - - isFirstTimeStart = false; - - log.info("Stopped"); - } - - /** - * add typed flow entry from flow rule into the internal flow table. - * - * @param flowRules the flow rules - * - */ - public synchronized void addWithFlowRule(FlowRule... flowRules) { - for (FlowRule fr : flowRules) { - // First remove old entry unconditionally, if exist - deviceFlowTable.remove(fr); - - // add new flow entry, we suppose IMMEDIATE_FLOW - TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr, - FlowLiveType.IMMEDIATE_FLOW); - deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry); - } - } - - /** - * add or update typed flow entry from flow entry into the internal flow table. - * - * @param flowEntries the flow entries - * - */ - public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) { - for (FlowEntry fe : flowEntries) { - // check if this new rule is an update to an existing entry - TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe); - - if (stored != null) { - // duplicated flow entry is collected!, just skip - if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets() - && fe.life() == stored.life()) { - log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value()) - + ",is DUPLICATED stats collection, just skip." - + " AdaptiveStats collection thread for {}", - sw.getStringId()); - - stored.setLastSeen(); - continue; - } else if (fe.life() < stored.life()) { - // Invalid updates the stats values, i.e., bytes, packets, durations ... - log.debug("addOrUpdateFlows():" + - " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." + - " new flowId=" + Long.toHexString(fe.id().value()) + - ", old flowId=" + Long.toHexString(stored.id().value()) + - ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() + - ", new life=" + fe.life() + ", old life=" + stored.life() + - ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen()); - // go next - stored.setLastSeen(); - continue; - } - - // update now - stored.setLife(fe.life()); - stored.setPackets(fe.packets()); - stored.setBytes(fe.bytes()); - stored.setLastSeen(); - if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) { - // flow is really RULE_ADDED - stored.setState(FlowEntry.FlowEntryState.ADDED); - } - // flow is RULE_UPDATED, skip adding and just updating flow live table - //deviceFlowTable.calAndSetFlowLiveType(stored); - continue; - } - - // add new flow entry, we suppose IMMEDIATE_FLOW - TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe, - FlowLiveType.IMMEDIATE_FLOW); - deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry); - } - } - - /** - * remove typed flow entry from the internal flow table. - * - * @param flowRules the flow entries - * - */ - public synchronized void removeFlows(FlowRule... flowRules) { - for (FlowRule rule : flowRules) { - deviceFlowTable.remove(rule); - } - } - - // same as removeFlows() function - /** - * remove typed flow entry from the internal flow table. - * - * @param flowRules the flow entries - * - */ - public void flowRemoved(FlowRule... flowRules) { - removeFlows(flowRules); - } - - // same as addOrUpdateFlows() function - /** - * add or update typed flow entry from flow entry into the internal flow table. - * - * @param flowEntries the flow entry list - * - */ - public void pushFlowMetrics(List<FlowEntry> flowEntries) { - flowEntries.forEach(fe -> { - addOrUpdateFlows(fe); - }); - } - - /** - * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)). - * - * @return xid of missing flow - */ - public long getFlowMissingXid() { - return flowMissingXid; - } - - /** - * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id. - * - * @param flowMissingXid the OFFlowStatsRequest message Id - * - */ - public void setFlowMissingXid(long flowMissingXid) { - this.flowMissingXid = flowMissingXid; - } - - private class InternalDeviceFlowTable { - - private final Map<FlowId, Set<TypedStoredFlowEntry>> - flowEntries = Maps.newConcurrentMap(); - - private final Set<StoredFlowEntry> shortFlows = new HashSet<>(); - private final Set<StoredFlowEntry> midFlows = new HashSet<>(); - private final Set<StoredFlowEntry> longFlows = new HashSet<>(); - - // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply - private final long latencyFlowStatsRequestAndReplyMillis = 500; - - - // Statistics for table operation - private long addCount = 0, addWithSetFlowLiveTypeCount = 0; - private long removeCount = 0; - - /** - * Resets all count values with zero. - * - */ - public void resetAllCount() { - addCount = 0; - addWithSetFlowLiveTypeCount = 0; - removeCount = 0; - } - - // get set of flow entries for the given flowId - private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) { - return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet()); - } - - // get flow entry for the given flow rule - private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) { - Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id()); - return flowEntries.stream() - .filter(entry -> Objects.equal(entry, rule)) - .findAny() - .orElse(null); - } - - // get the flow entries for all flows in flow table - private Set<TypedStoredFlowEntry> getFlowEntriesInternal() { - Set<TypedStoredFlowEntry> result = Sets.newHashSet(); - - flowEntries.values().forEach(result::addAll); - return result; - } - - /** - * Gets the number of flow entry in flow table. - * - * @return the number of flow entry. - * - */ - public long getFlowCount() { - return flowEntries.values().stream().mapToLong(Set::size).sum(); - } - - /** - * Gets the number of flow entry in flow table. - * - * @param rule the flow rule - * @return the typed flow entry. - * - */ - public TypedStoredFlowEntry getFlowEntry(FlowRule rule) { - checkNotNull(rule); - - return getFlowEntryInternal(rule); - } - - /** - * Gets the all typed flow entries in flow table. - * - * @return the set of typed flow entry. - * - */ - public Set<TypedStoredFlowEntry> getFlowEntries() { - return getFlowEntriesInternal(); - } - - /** - * Gets the short typed flow entries in flow table. - * - * @return the set of typed flow entry. - * - */ - public Set<StoredFlowEntry> getShortFlows() { - return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows); - } - - /** - * Gets the mid typed flow entries in flow table. - * - * @return the set of typed flow entry. - * - */ - public Set<StoredFlowEntry> getMidFlows() { - return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows); - } - - /** - * Gets the long typed flow entries in flow table. - * - * @return the set of typed flow entry. - * - */ - public Set<StoredFlowEntry> getLongFlows() { - return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows); - } - - /** - * Add typed flow entry into table only. - * - * @param rule the flow rule - * - */ - public synchronized void add(TypedStoredFlowEntry rule) { - checkNotNull(rule); - - //rule have to be new DefaultTypedFlowEntry - boolean result = getFlowEntriesInternal(rule.id()).add(rule); - - if (result) { - addCount++; - } - } - - /** - * Calculates and set the flow live type at the first time, - * and then add it into a corresponding typed flow table. - * - * @param rule the flow rule - * - */ - public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) { - checkNotNull(rule); - - calAndSetFlowLiveTypeInternal(rule); - } - - /** - * Add the typed flow entry into table, and calculates and set the flow live type, - * and then add it into a corresponding typed flow table. - * - * @param rule the flow rule - * - */ - public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) { - checkNotNull(rule); - - //rule have to be new DefaultTypedFlowEntry - boolean result = getFlowEntriesInternal(rule.id()).add(rule); - if (result) { - calAndSetFlowLiveTypeInternal(rule); - addWithSetFlowLiveTypeCount++; - } else { - log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value()) - + " ADD Failed, cause it may already exists in table !!!," - + " AdaptiveStats collection thread for {}", - sw.getStringId()); - } - } - - // In real, calculates and set the flow live type at the first time, - // and then add it into a corresponding typed flow table - private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) { - long life = rule.life(); - FlowLiveType prevFlowLiveType = rule.flowLiveType(); - - if (life >= longPollInterval) { - rule.setFlowLiveType(FlowLiveType.LONG_FLOW); - longFlows.add(rule); - } else if (life >= midPollInterval) { - rule.setFlowLiveType(FlowLiveType.MID_FLOW); - midFlows.add(rule); - } else if (life >= calAndPollInterval) { - rule.setFlowLiveType(FlowLiveType.SHORT_FLOW); - shortFlows.add(rule); - } else if (life >= 0) { - rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW); - } else { // life < 0 - rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW); - } - - if (rule.flowLiveType() != prevFlowLiveType) { - switch (prevFlowLiveType) { - // delete it from previous flow table - case SHORT_FLOW: - shortFlows.remove(rule); - break; - case MID_FLOW: - midFlows.remove(rule); - break; - case LONG_FLOW: - longFlows.remove(rule); - break; - default: - break; - } - } - } - - - // check the flow live type based on current time, then set and add it into corresponding table - private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) { - long curTime = (cTime > 0 ? cTime : System.currentTimeMillis()); - // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply - long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000); - // fe.life() unit is SECOND! - long liveTime = fe.life() + fromLastSeen; - - - switch (fe.flowLiveType()) { - case IMMEDIATE_FLOW: - if (liveTime >= longPollInterval) { - fe.setFlowLiveType(FlowLiveType.LONG_FLOW); - longFlows.add(fe); - } else if (liveTime >= midPollInterval) { - fe.setFlowLiveType(FlowLiveType.MID_FLOW); - midFlows.add(fe); - } else if (liveTime >= calAndPollInterval) { - fe.setFlowLiveType(FlowLiveType.SHORT_FLOW); - shortFlows.add(fe); - } - break; - case SHORT_FLOW: - if (liveTime >= longPollInterval) { - fe.setFlowLiveType(FlowLiveType.LONG_FLOW); - shortFlows.remove(fe); - longFlows.add(fe); - } else if (liveTime >= midPollInterval) { - fe.setFlowLiveType(FlowLiveType.MID_FLOW); - shortFlows.remove(fe); - midFlows.add(fe); - } - break; - case MID_FLOW: - if (liveTime >= longPollInterval) { - fe.setFlowLiveType(FlowLiveType.LONG_FLOW); - midFlows.remove(fe); - longFlows.add(fe); - } - break; - case LONG_FLOW: - if (fromLastSeen > entirePollInterval) { - log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch."); - return false; - } - break; - case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through - default : - // Error Unknown Live Type - log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!" - + "AdaptiveStats collection thread for {}", - sw.getStringId()); - return false; - } - - log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value()) - + ", state=" + fe.state() - + ", After liveType=" + fe.flowLiveType() - + ", liveTime=" + liveTime - + ", life=" + fe.life() - + ", bytes=" + fe.bytes() - + ", packets=" + fe.packets() - + ", fromLastSeen=" + fromLastSeen - + ", priority=" + fe.priority() - + ", selector=" + fe.selector().criteria() - + ", treatment=" + fe.treatment() - + " AdaptiveStats collection thread for {}", - sw.getStringId()); - - return true; - } - - /** - * Check and move live type for all type flow entries in table at every calAndPollInterval time. - * - */ - public void checkAndMoveLiveFlowAll() { - Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal(); - - long calCurTime = System.currentTimeMillis(); - typedFlowEntries.forEach(fe -> { - if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) { - remove(fe); - } - }); - - // print table counts for debug - if (log.isDebugEnabled()) { - synchronized (this) { - long totalFlowCount = getFlowCount(); - long shortFlowCount = shortFlows.size(); - long midFlowCount = midFlows.size(); - long longFlowCount = longFlows.size(); - long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount; - long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount; - - log.debug("--------------------------------------------------------------------------- for {}", - sw.getStringId()); - log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount - + ", add - remove_Count=" + calTotalCount - + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount - + ", SHORT_FLOW_Count=" + shortFlowCount - + ", MID_FLOW_Count=" + midFlowCount - + ", LONG_FLOW_Count=" + longFlowCount - + ", add_Count=" + addCount - + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount - + ", remove_Count=" + removeCount - + " AdaptiveStats collection thread for {}", sw.getStringId()); - log.debug("--------------------------------------------------------------------------- for {}", - sw.getStringId()); - if (totalFlowCount != calTotalCount) { - log.error("checkAndMoveLiveFlowAll, Real total flow count and " - + "calculated total flow count do NOT match, something is wrong internally " - + "or check counter value bound is over!"); - } - if (immediateFlowCount < 0) { - log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, " - + "something is wrong internally " - + "or check counter value bound is over!"); - } - } - } - log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId()); - } - - /** - * Remove the typed flow entry from table. - * - * @param rule the flow rule - * - */ - public synchronized void remove(FlowRule rule) { - checkNotNull(rule); - - TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule); - if (removeStore != null) { - removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore); - boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore); - - if (result) { - removeCount++; - } - } - } - - // Remove the typed flow entry from corresponding table - private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) { - switch (fe.flowLiveType()) { - case IMMEDIATE_FLOW: - // do nothing - break; - case SHORT_FLOW: - shortFlows.remove(fe); - break; - case MID_FLOW: - midFlows.remove(fe); - break; - case LONG_FLOW: - longFlows.remove(fe); - break; - default: // error in Flow Live Type - log.error("removeLiveFlowsInternal, Unknown Live Type error!"); - break; - } - } - } -} +/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.of.flow.impl;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onosproject.net.flow.DefaultTypedFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.TypedStoredFlowEntry;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.openflow.controller.OpenFlowSwitch;
+import org.onosproject.openflow.controller.RoleState;
+import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
+import org.projectfloodlight.openflow.protocol.match.Match;
+import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.TableId;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Efficiently and adaptively collects flow statistics for the specified switch.
+ */
+public class NewAdaptiveFlowStatsCollector {
+
+ private final Logger log = getLogger(getClass());
+
+ private final OpenFlowSwitch sw;
+
+ private ScheduledExecutorService adaptiveFlowStatsScheduler =
+ Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
+ private ScheduledFuture<?> calAndShortFlowsThread;
+ private ScheduledFuture<?> midFlowsThread;
+ private ScheduledFuture<?> longFlowsThread;
+
+ // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
+ private CalAndShortFlowsTask calAndShortFlowsTask;
+ // Task that collects stats MID flows every 2*calAndPollInterval
+ private MidFlowsTask midFlowsTask;
+ // Task that collects stats LONG flows every 3*calAndPollInterval
+ private LongFlowsTask longFlowsTask;
+
+ private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
+ private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
+ private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
+ //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
+ // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
+ private static final int ENTIRE_POLL_TIMES = 6;
+
+ private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
+ private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
+ private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
+
+ private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
+ private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
+ private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
+ // only used for checking condition at each task if it collects entire flows from a given switch or not
+ private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
+
+ // Number of call count of each Task,
+ // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
+ private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
+ private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
+ private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
+
+ private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
+
+ private boolean isFirstTimeStart = true;
+
+ public static final long NO_FLOW_MISSING_XID = (-1);
+ private long flowMissingXid = NO_FLOW_MISSING_XID;
+
+ /**
+ * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
+ *
+ * @param sw switch to pull
+ * @param pollInterval cal and immediate poll frequency in seconds
+ */
+ NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
+ this.sw = sw;
+
+ initMemberVars(pollInterval);
+ }
+
+ // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
+ private void initMemberVars(int pollInterval) {
+ if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
+ this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
+ } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
+ this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
+ } else {
+ this.calAndPollInterval = pollInterval;
+ }
+
+ calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
+ midPollInterval = MID_POLL_TIMES * calAndPollInterval;
+ longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
+ entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
+
+ callCountCalAndShortFlowsTask = 0;
+ callCountMidFlowsTask = 0;
+ callCountLongFlowsTask = 0;
+
+ flowMissingXid = NO_FLOW_MISSING_XID;
+ }
+
+ /**
+ * Adjusts adaptive poll frequency.
+ *
+ * @param pollInterval poll frequency in seconds
+ */
+ synchronized void adjustCalAndPollInterval(int pollInterval) {
+ initMemberVars(pollInterval);
+
+ if (calAndShortFlowsThread != null) {
+ calAndShortFlowsThread.cancel(false);
+ }
+ if (midFlowsThread != null) {
+ midFlowsThread.cancel(false);
+ }
+ if (longFlowsThread != null) {
+ longFlowsThread.cancel(false);
+ }
+
+ calAndShortFlowsTask = new CalAndShortFlowsTask();
+ calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
+ calAndShortFlowsTask,
+ 0,
+ calAndPollInterval,
+ TimeUnit.SECONDS);
+
+ midFlowsTask = new MidFlowsTask();
+ midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
+ midFlowsTask,
+ 0,
+ midPollInterval,
+ TimeUnit.SECONDS);
+
+ longFlowsTask = new LongFlowsTask();
+ longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
+ longFlowsTask,
+ 0,
+ longPollInterval,
+ TimeUnit.SECONDS);
+
+ log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
+ }
+
+ private class CalAndShortFlowsTask implements Runnable {
+ @Override
+ public void run() {
+ if (sw.getRole() == RoleState.MASTER) {
+ log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
+
+ if (isFirstTimeStart) {
+ // isFirstTimeStart, get entire flow stats from a given switch sw
+ log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
+ sw.getStringId());
+ ofFlowStatsRequestAllSend();
+
+ callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
+ isFirstTimeStart = false;
+ } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
+ // entire_poll_times, get entire flow stats from a given switch sw
+ log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
+ ofFlowStatsRequestAllSend();
+
+ callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
+ //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
+ //
+ } else {
+ calAndShortFlowsTaskInternal();
+ callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
+ }
+ }
+ }
+ }
+
+ // send openflow flow stats request message with getting all flow entries to a given switch sw
+ private void ofFlowStatsRequestAllSend() {
+ OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
+ .setMatch(sw.factory().matchWildcardAll())
+ .setTableId(TableId.ALL)
+ .setOutPort(OFPort.NO_MASK)
+ .build();
+
+ synchronized (this) {
+ // set the request xid to check the reply in OpenFlowRuleProvider
+ // After processing the reply of this request message,
+ // this must be set to NO_FLOW_MISSING_XID(-1) by provider
+ setFlowMissingXid(request.getXid());
+ log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
+
+ sw.sendMsg(request);
+ }
+ }
+
+ // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
+ private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
+ // set find match
+ Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
+ Optional.empty()).buildMatch();
+ // set find tableId
+ TableId tableId = TableId.of(fe.tableId());
+ // set output port
+ Instruction ins = fe.treatment().allInstructions().stream()
+ .filter(i -> (i.type() == Instruction.Type.OUTPUT))
+ .findFirst()
+ .orElse(null);
+ OFPort ofPort = OFPort.NO_MASK;
+ if (ins != null) {
+ Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
+ ofPort = OFPort.of((int) ((out.port().toLong())));
+ }
+
+ OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
+ .setMatch(match)
+ .setTableId(tableId)
+ .setOutPort(ofPort)
+ .build();
+
+ synchronized (this) {
+ if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
+ log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
+ + " set no flow missing xid anyway, for {}",
+ sw.getStringId());
+ setFlowMissingXid(NO_FLOW_MISSING_XID);
+ }
+
+ sw.sendMsg(request);
+ }
+ }
+
+ private void calAndShortFlowsTaskInternal() {
+ deviceFlowTable.checkAndMoveLiveFlowAll();
+
+ deviceFlowTable.getShortFlows().forEach(fe -> {
+ ofFlowStatsRequestFlowSend(fe);
+ });
+ }
+
+ private class MidFlowsTask implements Runnable {
+ @Override
+ public void run() {
+ if (sw.getRole() == RoleState.MASTER) {
+ log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
+
+ // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
+ if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
+ callCountMidFlowsTask = MID_POLL_TIMES;
+ } else {
+ midFlowsTaskInternal();
+ callCountMidFlowsTask += MID_POLL_TIMES;
+ }
+ }
+ }
+ }
+
+ private void midFlowsTaskInternal() {
+ deviceFlowTable.getMidFlows().forEach(fe -> {
+ ofFlowStatsRequestFlowSend(fe);
+ });
+ }
+
+ private class LongFlowsTask implements Runnable {
+ @Override
+ public void run() {
+ if (sw.getRole() == RoleState.MASTER) {
+ log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
+
+ // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
+ if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
+ callCountLongFlowsTask = LONG_POLL_TIMES;
+ } else {
+ longFlowsTaskInternal();
+ callCountLongFlowsTask += LONG_POLL_TIMES;
+ }
+ }
+ }
+ }
+
+ private void longFlowsTaskInternal() {
+ deviceFlowTable.getLongFlows().forEach(fe -> {
+ ofFlowStatsRequestFlowSend(fe);
+ });
+ }
+
+ /**
+ * start adaptive flow statistic collection.
+ *
+ */
+ public synchronized void start() {
+ log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
+ callCountCalAndShortFlowsTask = 0;
+ callCountMidFlowsTask = 0;
+ callCountLongFlowsTask = 0;
+
+ isFirstTimeStart = true;
+
+ // Initially start polling quickly. Then drop down to configured value
+ calAndShortFlowsTask = new CalAndShortFlowsTask();
+ calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
+ calAndShortFlowsTask,
+ 1,
+ calAndPollInterval,
+ TimeUnit.SECONDS);
+
+ midFlowsTask = new MidFlowsTask();
+ midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
+ midFlowsTask,
+ 1,
+ midPollInterval,
+ TimeUnit.SECONDS);
+
+ longFlowsTask = new LongFlowsTask();
+ longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
+ longFlowsTask,
+ 1,
+ longPollInterval,
+ TimeUnit.SECONDS);
+
+ log.info("Started");
+ }
+
+ /**
+ * stop adaptive flow statistic collection.
+ *
+ */
+ public synchronized void stop() {
+ log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
+ if (calAndShortFlowsThread != null) {
+ calAndShortFlowsThread.cancel(true);
+ }
+ if (midFlowsThread != null) {
+ midFlowsThread.cancel(true);
+ }
+ if (longFlowsThread != null) {
+ longFlowsThread.cancel(true);
+ }
+
+ adaptiveFlowStatsScheduler.shutdownNow();
+
+ isFirstTimeStart = false;
+
+ log.info("Stopped");
+ }
+
+ /**
+ * add typed flow entry from flow rule into the internal flow table.
+ *
+ * @param flowRules the flow rules
+ *
+ */
+ public synchronized void addWithFlowRule(FlowRule... flowRules) {
+ for (FlowRule fr : flowRules) {
+ // First remove old entry unconditionally, if exist
+ deviceFlowTable.remove(fr);
+
+ // add new flow entry, we suppose IMMEDIATE_FLOW
+ TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
+ FlowLiveType.IMMEDIATE_FLOW);
+ deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
+ }
+ }
+
+ /**
+ * add or update typed flow entry from flow entry into the internal flow table.
+ *
+ * @param flowEntries the flow entries
+ *
+ */
+ public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
+ for (FlowEntry fe : flowEntries) {
+ // check if this new rule is an update to an existing entry
+ TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
+
+ if (stored != null) {
+ // duplicated flow entry is collected!, just skip
+ if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
+ && fe.life() == stored.life()) {
+ log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
+ + ",is DUPLICATED stats collection, just skip."
+ + " AdaptiveStats collection thread for {}",
+ sw.getStringId());
+
+ stored.setLastSeen();
+ continue;
+ } else if (fe.life() < stored.life()) {
+ // Invalid updates the stats values, i.e., bytes, packets, durations ...
+ log.debug("addOrUpdateFlows():" +
+ " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
+ " new flowId=" + Long.toHexString(fe.id().value()) +
+ ", old flowId=" + Long.toHexString(stored.id().value()) +
+ ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
+ ", new life=" + fe.life() + ", old life=" + stored.life() +
+ ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
+ // go next
+ stored.setLastSeen();
+ continue;
+ }
+
+ // update now
+ stored.setLife(fe.life());
+ stored.setPackets(fe.packets());
+ stored.setBytes(fe.bytes());
+ stored.setLastSeen();
+ if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
+ // flow is really RULE_ADDED
+ stored.setState(FlowEntry.FlowEntryState.ADDED);
+ }
+ // flow is RULE_UPDATED, skip adding and just updating flow live table
+ //deviceFlowTable.calAndSetFlowLiveType(stored);
+ continue;
+ }
+
+ // add new flow entry, we suppose IMMEDIATE_FLOW
+ TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
+ FlowLiveType.IMMEDIATE_FLOW);
+ deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
+ }
+ }
+
+ /**
+ * remove typed flow entry from the internal flow table.
+ *
+ * @param flowRules the flow entries
+ *
+ */
+ public synchronized void removeFlows(FlowRule... flowRules) {
+ for (FlowRule rule : flowRules) {
+ deviceFlowTable.remove(rule);
+ }
+ }
+
+ // same as removeFlows() function
+ /**
+ * remove typed flow entry from the internal flow table.
+ *
+ * @param flowRules the flow entries
+ *
+ */
+ public void flowRemoved(FlowRule... flowRules) {
+ removeFlows(flowRules);
+ }
+
+ // same as addOrUpdateFlows() function
+ /**
+ * add or update typed flow entry from flow entry into the internal flow table.
+ *
+ * @param flowEntries the flow entry list
+ *
+ */
+ public void pushFlowMetrics(List<FlowEntry> flowEntries) {
+ flowEntries.forEach(fe -> {
+ addOrUpdateFlows(fe);
+ });
+ }
+
+ /**
+ * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
+ *
+ * @return xid of missing flow
+ */
+ public long getFlowMissingXid() {
+ return flowMissingXid;
+ }
+
+ /**
+ * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
+ *
+ * @param flowMissingXid the OFFlowStatsRequest message Id
+ *
+ */
+ public void setFlowMissingXid(long flowMissingXid) {
+ this.flowMissingXid = flowMissingXid;
+ }
+
+ private class InternalDeviceFlowTable {
+
+ private final Map<FlowId, Set<TypedStoredFlowEntry>>
+ flowEntries = Maps.newConcurrentMap();
+
+ private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
+ private final Set<StoredFlowEntry> midFlows = new HashSet<>();
+ private final Set<StoredFlowEntry> longFlows = new HashSet<>();
+
+ // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
+ private final long latencyFlowStatsRequestAndReplyMillis = 500;
+
+
+ // Statistics for table operation
+ private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
+ private long removeCount = 0;
+
+ /**
+ * Resets all count values with zero.
+ *
+ */
+ public void resetAllCount() {
+ addCount = 0;
+ addWithSetFlowLiveTypeCount = 0;
+ removeCount = 0;
+ }
+
+ // get set of flow entries for the given flowId
+ private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
+ return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
+ }
+
+ // get flow entry for the given flow rule
+ private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
+ Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
+ return flowEntries.stream()
+ .filter(entry -> Objects.equal(entry, rule))
+ .findAny()
+ .orElse(null);
+ }
+
+ // get the flow entries for all flows in flow table
+ private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
+ Set<TypedStoredFlowEntry> result = Sets.newHashSet();
+
+ flowEntries.values().forEach(result::addAll);
+ return result;
+ }
+
+ /**
+ * Gets the number of flow entry in flow table.
+ *
+ * @return the number of flow entry.
+ *
+ */
+ public long getFlowCount() {
+ return flowEntries.values().stream().mapToLong(Set::size).sum();
+ }
+
+ /**
+ * Gets the number of flow entry in flow table.
+ *
+ * @param rule the flow rule
+ * @return the typed flow entry.
+ *
+ */
+ public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
+ checkNotNull(rule);
+
+ return getFlowEntryInternal(rule);
+ }
+
+ /**
+ * Gets the all typed flow entries in flow table.
+ *
+ * @return the set of typed flow entry.
+ *
+ */
+ public Set<TypedStoredFlowEntry> getFlowEntries() {
+ return getFlowEntriesInternal();
+ }
+
+ /**
+ * Gets the short typed flow entries in flow table.
+ *
+ * @return the set of typed flow entry.
+ *
+ */
+ public Set<StoredFlowEntry> getShortFlows() {
+ return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
+ }
+
+ /**
+ * Gets the mid typed flow entries in flow table.
+ *
+ * @return the set of typed flow entry.
+ *
+ */
+ public Set<StoredFlowEntry> getMidFlows() {
+ return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
+ }
+
+ /**
+ * Gets the long typed flow entries in flow table.
+ *
+ * @return the set of typed flow entry.
+ *
+ */
+ public Set<StoredFlowEntry> getLongFlows() {
+ return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
+ }
+
+ /**
+ * Add typed flow entry into table only.
+ *
+ * @param rule the flow rule
+ *
+ */
+ public synchronized void add(TypedStoredFlowEntry rule) {
+ checkNotNull(rule);
+
+ //rule have to be new DefaultTypedFlowEntry
+ boolean result = getFlowEntriesInternal(rule.id()).add(rule);
+
+ if (result) {
+ addCount++;
+ }
+ }
+
+ /**
+ * Calculates and set the flow live type at the first time,
+ * and then add it into a corresponding typed flow table.
+ *
+ * @param rule the flow rule
+ *
+ */
+ public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
+ checkNotNull(rule);
+
+ calAndSetFlowLiveTypeInternal(rule);
+ }
+
+ /**
+ * Add the typed flow entry into table, and calculates and set the flow live type,
+ * and then add it into a corresponding typed flow table.
+ *
+ * @param rule the flow rule
+ *
+ */
+ public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
+ checkNotNull(rule);
+
+ //rule have to be new DefaultTypedFlowEntry
+ boolean result = getFlowEntriesInternal(rule.id()).add(rule);
+ if (result) {
+ calAndSetFlowLiveTypeInternal(rule);
+ addWithSetFlowLiveTypeCount++;
+ } else {
+ log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
+ + " ADD Failed, cause it may already exists in table !!!,"
+ + " AdaptiveStats collection thread for {}",
+ sw.getStringId());
+ }
+ }
+
+ // In real, calculates and set the flow live type at the first time,
+ // and then add it into a corresponding typed flow table
+ private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
+ long life = rule.life();
+ FlowLiveType prevFlowLiveType = rule.flowLiveType();
+
+ if (life >= longPollInterval) {
+ rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
+ longFlows.add(rule);
+ } else if (life >= midPollInterval) {
+ rule.setFlowLiveType(FlowLiveType.MID_FLOW);
+ midFlows.add(rule);
+ } else if (life >= calAndPollInterval) {
+ rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
+ shortFlows.add(rule);
+ } else if (life >= 0) {
+ rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
+ } else { // life < 0
+ rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
+ }
+
+ if (rule.flowLiveType() != prevFlowLiveType) {
+ switch (prevFlowLiveType) {
+ // delete it from previous flow table
+ case SHORT_FLOW:
+ shortFlows.remove(rule);
+ break;
+ case MID_FLOW:
+ midFlows.remove(rule);
+ break;
+ case LONG_FLOW:
+ longFlows.remove(rule);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+
+ // check the flow live type based on current time, then set and add it into corresponding table
+ private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
+ long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
+ // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
+ long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
+ // fe.life() unit is SECOND!
+ long liveTime = fe.life() + fromLastSeen;
+
+
+ switch (fe.flowLiveType()) {
+ case IMMEDIATE_FLOW:
+ if (liveTime >= longPollInterval) {
+ fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
+ longFlows.add(fe);
+ } else if (liveTime >= midPollInterval) {
+ fe.setFlowLiveType(FlowLiveType.MID_FLOW);
+ midFlows.add(fe);
+ } else if (liveTime >= calAndPollInterval) {
+ fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
+ shortFlows.add(fe);
+ }
+ break;
+ case SHORT_FLOW:
+ if (liveTime >= longPollInterval) {
+ fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
+ shortFlows.remove(fe);
+ longFlows.add(fe);
+ } else if (liveTime >= midPollInterval) {
+ fe.setFlowLiveType(FlowLiveType.MID_FLOW);
+ shortFlows.remove(fe);
+ midFlows.add(fe);
+ }
+ break;
+ case MID_FLOW:
+ if (liveTime >= longPollInterval) {
+ fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
+ midFlows.remove(fe);
+ longFlows.add(fe);
+ }
+ break;
+ case LONG_FLOW:
+ if (fromLastSeen > entirePollInterval) {
+ log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
+ return false;
+ }
+ break;
+ case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
+ default :
+ // Error Unknown Live Type
+ log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
+ + "AdaptiveStats collection thread for {}",
+ sw.getStringId());
+ return false;
+ }
+
+ log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
+ + ", state=" + fe.state()
+ + ", After liveType=" + fe.flowLiveType()
+ + ", liveTime=" + liveTime
+ + ", life=" + fe.life()
+ + ", bytes=" + fe.bytes()
+ + ", packets=" + fe.packets()
+ + ", fromLastSeen=" + fromLastSeen
+ + ", priority=" + fe.priority()
+ + ", selector=" + fe.selector().criteria()
+ + ", treatment=" + fe.treatment()
+ + " AdaptiveStats collection thread for {}",
+ sw.getStringId());
+
+ return true;
+ }
+
+ /**
+ * Check and move live type for all type flow entries in table at every calAndPollInterval time.
+ *
+ */
+ public void checkAndMoveLiveFlowAll() {
+ Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
+
+ long calCurTime = System.currentTimeMillis();
+ typedFlowEntries.forEach(fe -> {
+ if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
+ remove(fe);
+ }
+ });
+
+ // print table counts for debug
+ if (log.isDebugEnabled()) {
+ synchronized (this) {
+ long totalFlowCount = getFlowCount();
+ long shortFlowCount = shortFlows.size();
+ long midFlowCount = midFlows.size();
+ long longFlowCount = longFlows.size();
+ long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
+ long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
+
+ log.debug("--------------------------------------------------------------------------- for {}",
+ sw.getStringId());
+ log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
+ + ", add - remove_Count=" + calTotalCount
+ + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
+ + ", SHORT_FLOW_Count=" + shortFlowCount
+ + ", MID_FLOW_Count=" + midFlowCount
+ + ", LONG_FLOW_Count=" + longFlowCount
+ + ", add_Count=" + addCount
+ + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
+ + ", remove_Count=" + removeCount
+ + " AdaptiveStats collection thread for {}", sw.getStringId());
+ log.debug("--------------------------------------------------------------------------- for {}",
+ sw.getStringId());
+ if (totalFlowCount != calTotalCount) {
+ log.error("checkAndMoveLiveFlowAll, Real total flow count and "
+ + "calculated total flow count do NOT match, something is wrong internally "
+ + "or check counter value bound is over!");
+ }
+ if (immediateFlowCount < 0) {
+ log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
+ + "something is wrong internally "
+ + "or check counter value bound is over!");
+ }
+ }
+ }
+ log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
+ }
+
+ /**
+ * Remove the typed flow entry from table.
+ *
+ * @param rule the flow rule
+ *
+ */
+ public synchronized void remove(FlowRule rule) {
+ checkNotNull(rule);
+
+ TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
+ if (removeStore != null) {
+ removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
+ boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
+
+ if (result) {
+ removeCount++;
+ }
+ }
+ }
+
+ // Remove the typed flow entry from corresponding table
+ private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
+ switch (fe.flowLiveType()) {
+ case IMMEDIATE_FLOW:
+ // do nothing
+ break;
+ case SHORT_FLOW:
+ shortFlows.remove(fe);
+ break;
+ case MID_FLOW:
+ midFlows.remove(fe);
+ break;
+ case LONG_FLOW:
+ longFlows.remove(fe);
+ break;
+ default: // error in Flow Live Type
+ log.error("removeLiveFlowsInternal, Unknown Live Type error!");
+ break;
+ }
+ }
+ }
+}
diff --git a/framework/src/onos/providers/openflow/group/pom.xml b/framework/src/onos/providers/openflow/group/pom.xml index 049ccaff..c63a016d 100644 --- a/framework/src/onos/providers/openflow/group/pom.xml +++ b/framework/src/onos/providers/openflow/group/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.onosproject</groupId> <artifactId>onos-of-providers</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.0-rc1</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/framework/src/onos/providers/openflow/meter/pom.xml b/framework/src/onos/providers/openflow/meter/pom.xml index 55068cd7..46bcefcf 100644 --- a/framework/src/onos/providers/openflow/meter/pom.xml +++ b/framework/src/onos/providers/openflow/meter/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.onosproject</groupId> <artifactId>onos-of-providers</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.0-rc1</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/framework/src/onos/providers/openflow/packet/pom.xml b/framework/src/onos/providers/openflow/packet/pom.xml index cc2316c9..b8dbb689 100644 --- a/framework/src/onos/providers/openflow/packet/pom.xml +++ b/framework/src/onos/providers/openflow/packet/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.onosproject</groupId> <artifactId>onos-of-providers</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.0-rc1</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/framework/src/onos/providers/openflow/pom.xml b/framework/src/onos/providers/openflow/pom.xml index 1250af61..349168e1 100644 --- a/framework/src/onos/providers/openflow/pom.xml +++ b/framework/src/onos/providers/openflow/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.onosproject</groupId> <artifactId>onos-providers</artifactId> - <version>1.4.0-SNAPSHOT</version> + <version>1.4.0-rc1</version> <relativePath>../pom.xml</relativePath> </parent> |