diff options
Diffstat (limited to 'framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial')
26 files changed, 6694 insertions, 0 deletions
diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/PathKey.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/PathKey.java new file mode 100644 index 00000000..00d6c9d2 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/PathKey.java @@ -0,0 +1,55 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import org.onosproject.net.DeviceId; + +import java.util.Objects; + +/** + * Key for filing pre-computed paths between source and destination devices. + */ +class PathKey { + private final DeviceId src; + private final DeviceId dst; + + /** + * Creates a path key from the given source/dest pair. + * @param src source device + * @param dst destination device + */ + PathKey(DeviceId src, DeviceId dst) { + this.src = src; + this.dst = dst; + } + + @Override + public int hashCode() { + return Objects.hash(src, dst); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof PathKey) { + final PathKey other = (PathKey) obj; + return Objects.equals(this.src, other.src) && Objects.equals(this.dst, other.dst); + } + return false; + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationIdStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationIdStore.java new file mode 100644 index 00000000..6e6b9587 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationIdStore.java @@ -0,0 +1,70 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.ApplicationIdStore; +import org.onosproject.core.DefaultApplicationId; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Simple implementation of the application ID registry using in-memory + * structures. + */ +@Component(immediate = true) +@Service +public class SimpleApplicationIdStore implements ApplicationIdStore { + + private static final AtomicInteger ID_DISPENSER = new AtomicInteger(1); + + private final Map<Short, DefaultApplicationId> appIds = new ConcurrentHashMap<>(); + private final Map<String, DefaultApplicationId> appIdsByName = new ConcurrentHashMap<>(); + + @Override + public Set<ApplicationId> getAppIds() { + return ImmutableSet.<ApplicationId>copyOf(appIds.values()); + } + + @Override + public ApplicationId getAppId(Short id) { + return appIds.get(id); + } + + @Override + public ApplicationId getAppId(String name) { + return appIdsByName.get(name); + } + + @Override + public ApplicationId registerApplication(String name) { + DefaultApplicationId appId = appIdsByName.get(name); + if (appId == null) { + short id = (short) ID_DISPENSER.getAndIncrement(); + appId = new DefaultApplicationId(id, name); + appIds.put(id, appId); + appIdsByName.put(name, appId); + } + return appId; + } + +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationStore.java new file mode 100644 index 00000000..ea9a773e --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationStore.java @@ -0,0 +1,170 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.app.ApplicationDescription; +import org.onosproject.app.ApplicationEvent; +import org.onosproject.app.ApplicationState; +import org.onosproject.app.ApplicationStore; +import org.onosproject.common.app.ApplicationArchive; +import org.onosproject.core.Application; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.ApplicationIdStore; +import org.onosproject.core.DefaultApplication; +import org.onosproject.security.Permission; +import org.slf4j.Logger; + +import java.io.InputStream; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.onosproject.app.ApplicationEvent.Type.*; +import static org.onosproject.app.ApplicationState.ACTIVE; +import static org.onosproject.app.ApplicationState.INSTALLED; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages inventory of network control applications. + */ +@Component(immediate = true) +@Service +public class SimpleApplicationStore extends ApplicationArchive implements ApplicationStore { + + private final Logger log = getLogger(getClass()); + + // App inventory & states + private final ConcurrentMap<ApplicationId, DefaultApplication> apps = new ConcurrentHashMap<>(); + private final ConcurrentMap<ApplicationId, ApplicationState> states = new ConcurrentHashMap<>(); + private final ConcurrentMap<ApplicationId, Set<Permission>> permissions = new ConcurrentHashMap<>(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ApplicationIdStore idStore; + + @Activate + public void activate() { + loadFromDisk(); + log.info("Started"); + } + + private void loadFromDisk() { + for (String name : getApplicationNames()) { + ApplicationId appId = idStore.registerApplication(name); + ApplicationDescription appDesc = getApplicationDescription(name); + DefaultApplication app = + new DefaultApplication(appId, appDesc.version(), + appDesc.description(), appDesc.origin(), + appDesc.role(), appDesc.permissions(), + appDesc.featuresRepo(), appDesc.features()); + apps.put(appId, app); + states.put(appId, isActive(name) ? INSTALLED : ACTIVE); + // load app permissions + } + } + + @Deactivate + public void deactivate() { + apps.clear(); + states.clear(); + permissions.clear(); + log.info("Stopped"); + } + + @Override + public Set<Application> getApplications() { + return ImmutableSet.copyOf(apps.values()); + } + + @Override + public ApplicationId getId(String name) { + return idStore.getAppId(name); + } + + @Override + public Application getApplication(ApplicationId appId) { + return apps.get(appId); + } + + @Override + public ApplicationState getState(ApplicationId appId) { + return states.get(appId); + } + + @Override + public Application create(InputStream appDescStream) { + ApplicationDescription appDesc = saveApplication(appDescStream); + ApplicationId appId = idStore.registerApplication(appDesc.name()); + DefaultApplication app = + new DefaultApplication(appId, appDesc.version(), appDesc.description(), + appDesc.origin(), appDesc.role(), appDesc.permissions(), + appDesc.featuresRepo(), appDesc.features()); + apps.put(appId, app); + states.put(appId, INSTALLED); + delegate.notify(new ApplicationEvent(APP_INSTALLED, app)); + return app; + } + + @Override + public void remove(ApplicationId appId) { + Application app = apps.remove(appId); + if (app != null) { + states.remove(appId); + delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app)); + purgeApplication(app.id().name()); + } + } + + @Override + public void activate(ApplicationId appId) { + Application app = apps.get(appId); + if (app != null) { + setActive(appId.name()); + states.put(appId, ACTIVE); + delegate.notify(new ApplicationEvent(APP_ACTIVATED, app)); + } + } + + @Override + public void deactivate(ApplicationId appId) { + Application app = apps.get(appId); + if (app != null) { + clearActive(appId.name()); + states.put(appId, INSTALLED); + delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app)); + } + } + + @Override + public Set<Permission> getPermissions(ApplicationId appId) { + return permissions.get(appId); + } + + @Override + public void setPermissions(ApplicationId appId, Set<Permission> permissions) { + Application app = getApplication(appId); + if (app != null) { + this.permissions.put(appId, permissions); + delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app)); + } + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationStoreTest.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationStoreTest.java new file mode 100644 index 00000000..a1c7da37 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleApplicationStoreTest.java @@ -0,0 +1,154 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Files; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.util.Tools; +import org.onosproject.app.ApplicationEvent; +import org.onosproject.app.ApplicationStoreDelegate; +import org.onosproject.common.app.ApplicationArchive; +import org.onosproject.core.Application; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.ApplicationIdStoreAdapter; +import org.onosproject.core.DefaultApplicationId; +import org.onosproject.security.AppPermission; +import org.onosproject.security.Permission; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.onosproject.app.ApplicationEvent.Type.*; +import static org.onosproject.app.ApplicationState.ACTIVE; +import static org.onosproject.app.ApplicationState.INSTALLED; + +/** + * Test of the trivial application store implementation. + */ +public class SimpleApplicationStoreTest { + + static final File STORE = Files.createTempDir(); + + private TestApplicationStore store = new TestApplicationStore(); + private TestDelegate delegate = new TestDelegate(); + private static final Object LOCK = new Object(); + + @Before + public void setUp() { + store.idStore = new TestIdStore(); + store.setRootPath(STORE.getAbsolutePath()); + store.setDelegate(delegate); + store.activate(); + } + + @After + public void tearDown() throws IOException { + if (STORE.exists()) { + Tools.removeDirectory(STORE); + } + store.deactivate(); + } + + private Application createTestApp() { + synchronized (LOCK) { + return store.create(ApplicationArchive.class.getResourceAsStream("app.zip")); + } + } + + @Test + public void create() { + Application app = createTestApp(); + assertEquals("incorrect name", "org.foo.app", app.id().name()); + assertEquals("incorrect app count", 1, store.getApplications().size()); + assertEquals("incorrect app", app, store.getApplication(app.id())); + assertEquals("incorrect app state", INSTALLED, store.getState(app.id())); + assertEquals("incorrect event type", APP_INSTALLED, delegate.event.type()); + assertEquals("incorrect event app", app, delegate.event.subject()); + } + + @Test + public void remove() { + Application app = createTestApp(); + store.remove(app.id()); + assertEquals("incorrect app count", 0, store.getApplications().size()); + assertEquals("incorrect event type", APP_UNINSTALLED, delegate.event.type()); + assertEquals("incorrect event app", app, delegate.event.subject()); + } + + @Test + public void activate() { + Application app = createTestApp(); + store.activate(app.id()); + assertEquals("incorrect app count", 1, store.getApplications().size()); + assertEquals("incorrect app state", ACTIVE, store.getState(app.id())); + assertEquals("incorrect event type", APP_ACTIVATED, delegate.event.type()); + assertEquals("incorrect event app", app, delegate.event.subject()); + } + + @Test + public void deactivate() { + Application app = createTestApp(); + store.deactivate(app.id()); + assertEquals("incorrect app count", 1, store.getApplications().size()); + assertEquals("incorrect app state", INSTALLED, store.getState(app.id())); + assertEquals("incorrect event type", APP_DEACTIVATED, delegate.event.type()); + assertEquals("incorrect event app", app, delegate.event.subject()); + } + + @Test + public void permissions() { + Application app = createTestApp(); + ImmutableSet<Permission> permissions = + ImmutableSet.of(new Permission(AppPermission.class.getName(), "FLOWRULE_WRITE")); + store.setPermissions(app.id(), permissions); + assertEquals("incorrect app perms", 1, store.getPermissions(app.id()).size()); + assertEquals("incorrect app state", INSTALLED, store.getState(app.id())); + assertEquals("incorrect event type", APP_PERMISSIONS_CHANGED, delegate.event.type()); + assertEquals("incorrect event app", app, delegate.event.subject()); + } + + private class TestIdStore extends ApplicationIdStoreAdapter { + @Override + public ApplicationId registerApplication(String name) { + return new DefaultApplicationId(1, name); + } + + @Override + public ApplicationId getAppId(String name) { + return new DefaultApplicationId(1, name); + } + } + + private class TestDelegate implements ApplicationStoreDelegate { + private ApplicationEvent event; + + @Override + public void notify(ApplicationEvent event) { + this.event = event; + } + } + + private class TestApplicationStore extends SimpleApplicationStore { + @Override + public void setRootPath(String root) { + super.setRootPath(root); + } + } +}
\ No newline at end of file diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleClusterStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleClusterStore.java new file mode 100644 index 00000000..5eea3cc8 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleClusterStore.java @@ -0,0 +1,139 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.joda.time.DateTime; +import org.onlab.packet.IpAddress; +import org.onosproject.cluster.ClusterEvent; +import org.onosproject.cluster.ClusterStore; +import org.onosproject.cluster.ClusterStoreDelegate; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.DefaultControllerNode; +import org.onosproject.cluster.NodeId; +import org.onosproject.event.EventDeliveryService; +import org.onosproject.event.ListenerRegistry; +import org.onosproject.net.intent.Key; +import org.onosproject.net.intent.PartitionEvent; +import org.onosproject.net.intent.PartitionEventListener; +import org.onosproject.net.intent.PartitionService; +import org.onosproject.store.AbstractStore; +import org.slf4j.Logger; + +import java.util.Set; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages inventory of infrastructure devices using trivial in-memory + * structures implementation. + */ +@Component(immediate = true) +@Service +public class SimpleClusterStore + extends AbstractStore<ClusterEvent, ClusterStoreDelegate> + implements ClusterStore, PartitionService { + + public static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1"); + + private final Logger log = getLogger(getClass()); + + private ControllerNode instance; + + private final DateTime creationTime = DateTime.now(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected EventDeliveryService eventDispatcher; + + private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry; + + @Activate + public void activate() { + instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST); + + listenerRegistry = new ListenerRegistry<>(); + eventDispatcher.addSink(PartitionEvent.class, listenerRegistry); + + log.info("Started"); + } + + @Deactivate + public void deactivate() { + eventDispatcher.removeSink(PartitionEvent.class); + log.info("Stopped"); + } + + + @Override + public ControllerNode getLocalNode() { + return instance; + } + + @Override + public Set<ControllerNode> getNodes() { + return ImmutableSet.of(instance); + } + + @Override + public ControllerNode getNode(NodeId nodeId) { + return instance.id().equals(nodeId) ? instance : null; + } + + @Override + public ControllerNode.State getState(NodeId nodeId) { + return ControllerNode.State.ACTIVE; + } + + @Override + public DateTime getLastUpdated(NodeId nodeId) { + return creationTime; + } + + @Override + public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) { + return null; + } + + @Override + public void removeNode(NodeId nodeId) { + } + + @Override + public boolean isMine(Key intentKey) { + return true; + } + + @Override + public NodeId getLeader(Key intentKey) { + return instance.id(); + } + + @Override + public void addListener(PartitionEventListener listener) { + listenerRegistry.addListener(listener); + } + + @Override + public void removeListener(PartitionEventListener listener) { + listenerRegistry.removeListener(listener); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleComponentConfigStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleComponentConfigStore.java new file mode 100644 index 00000000..1d8bcd62 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleComponentConfigStore.java @@ -0,0 +1,62 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.cfg.ComponentConfigEvent; +import org.onosproject.cfg.ComponentConfigStore; +import org.onosproject.cfg.ComponentConfigStoreDelegate; +import org.onosproject.store.AbstractStore; +import org.slf4j.Logger; + +import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_SET; +import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_UNSET; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages inventory of component configuration properties. + */ +@Component(immediate = true) +@Service +public class SimpleComponentConfigStore + extends AbstractStore<ComponentConfigEvent, ComponentConfigStoreDelegate> + implements ComponentConfigStore { + + private final Logger log = getLogger(getClass()); + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + @Override + public void setProperty(String componentName, String name, String value) { + delegate.notify(new ComponentConfigEvent(PROPERTY_SET, componentName, name, value)); + } + + @Override + public void unsetProperty(String componentName, String name) { + delegate.notify(new ComponentConfigEvent(PROPERTY_UNSET, componentName, name, null)); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleDeviceStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleDeviceStore.java new file mode 100644 index 00000000..fc90dfad --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleDeviceStore.java @@ -0,0 +1,691 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.net.AnnotationsUtil; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.DefaultDevice; +import org.onosproject.net.DefaultPort; +import org.onosproject.net.Device; +import org.onosproject.net.Device.Type; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Port; +import org.onosproject.net.PortNumber; +import org.onosproject.net.SparseAnnotations; +import org.onosproject.net.device.DefaultDeviceDescription; +import org.onosproject.net.device.DefaultPortDescription; +import org.onosproject.net.device.DefaultPortStatistics; +import org.onosproject.net.device.DeviceDescription; +import org.onosproject.net.device.DeviceEvent; +import org.onosproject.net.device.DeviceStore; +import org.onosproject.net.device.DeviceStoreDelegate; +import org.onosproject.net.device.PortDescription; +import org.onosproject.net.device.PortStatistics; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.store.AbstractStore; +import org.onlab.packet.ChassisId; +import org.onlab.util.NewConcurrentHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Predicates.notNull; +import static com.google.common.base.Verify.verify; +import static org.onosproject.net.device.DeviceEvent.Type.*; +import static org.slf4j.LoggerFactory.getLogger; +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; +import static org.onosproject.net.DefaultAnnotations.union; +import static org.onosproject.net.DefaultAnnotations.merge; + +/** + * Manages inventory of infrastructure devices using trivial in-memory + * structures implementation. + */ +@Component(immediate = true) +@Service +public class SimpleDeviceStore + extends AbstractStore<DeviceEvent, DeviceStoreDelegate> + implements DeviceStore { + + private final Logger log = getLogger(getClass()); + + public static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; + + // Collection of Description given from various providers + private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>> + deviceDescs = Maps.newConcurrentMap(); + + // Cache of Device and Ports generated by compositing descriptions from providers + private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap(); + private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> + devicePorts = Maps.newConcurrentMap(); + private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, PortStatistics>> + devicePortStats = Maps.newConcurrentMap(); + private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, PortStatistics>> + devicePortDeltaStats = Maps.newConcurrentMap(); + + // Available (=UP) devices + private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet(); + + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + deviceDescs.clear(); + devices.clear(); + devicePorts.clear(); + availableDevices.clear(); + log.info("Stopped"); + } + + @Override + public int getDeviceCount() { + return devices.size(); + } + + @Override + public Iterable<Device> getDevices() { + return Collections.unmodifiableCollection(devices.values()); + } + + @Override + public Iterable<Device> getAvailableDevices() { + return FluentIterable.from(getDevices()) + .filter(new Predicate<Device>() { + + @Override + public boolean apply(Device input) { + return isAvailable(input.id()); + } + }); + } + + @Override + public Device getDevice(DeviceId deviceId) { + return devices.get(deviceId); + } + + @Override + public DeviceEvent createOrUpdateDevice(ProviderId providerId, + DeviceId deviceId, + DeviceDescription deviceDescription) { + Map<ProviderId, DeviceDescriptions> providerDescs + = getOrCreateDeviceDescriptions(deviceId); + + synchronized (providerDescs) { + // locking per device + DeviceDescriptions descs + = getOrCreateProviderDeviceDescriptions(providerDescs, + providerId, + deviceDescription); + + Device oldDevice = devices.get(deviceId); + // update description + descs.putDeviceDesc(deviceDescription); + Device newDevice = composeDevice(deviceId, providerDescs); + + if (oldDevice == null) { + // ADD + return createDevice(providerId, newDevice); + } else { + // UPDATE or ignore (no change or stale) + return updateDevice(providerId, oldDevice, newDevice); + } + } + } + + // Creates the device and returns the appropriate event if necessary. + // Guarded by deviceDescs value (=Device lock) + private DeviceEvent createDevice(ProviderId providerId, Device newDevice) { + // update composed device cache + Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); + verify(oldDevice == null, + "Unexpected Device in cache. PID:%s [old=%s, new=%s]", + providerId, oldDevice, newDevice); + + if (!providerId.isAncillary()) { + availableDevices.add(newDevice.id()); + } + + return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); + } + + // Updates the device and returns the appropriate event if necessary. + // Guarded by deviceDescs value (=Device lock) + private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) { + // We allow only certain attributes to trigger update + boolean propertiesChanged = + !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) || + !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()); + boolean annotationsChanged = + !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations()); + + // Primary providers can respond to all changes, but ancillary ones + // should respond only to annotation changes. + if ((providerId.isAncillary() && annotationsChanged) || + (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) { + + boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice); + if (!replaced) { + // FIXME: Is the enclosing if required here? + verify(replaced, + "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", + providerId, oldDevice, devices.get(newDevice.id()) + , newDevice); + } + if (!providerId.isAncillary()) { + availableDevices.add(newDevice.id()); + } + return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); + } + + // Otherwise merely attempt to change availability if primary provider + if (!providerId.isAncillary()) { + boolean added = availableDevices.add(newDevice.id()); + return !added ? null : + new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null); + } + return null; + } + + @Override + public DeviceEvent markOffline(DeviceId deviceId) { + Map<ProviderId, DeviceDescriptions> providerDescs + = getOrCreateDeviceDescriptions(deviceId); + + // locking device + synchronized (providerDescs) { + Device device = devices.get(deviceId); + if (device == null) { + return null; + } + boolean removed = availableDevices.remove(deviceId); + if (removed) { + // TODO: broadcast ... DOWN only? + return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); + } + return null; + } + } + + @Override + public List<DeviceEvent> updatePorts(ProviderId providerId, + DeviceId deviceId, + List<PortDescription> portDescriptions) { + Device device = devices.get(deviceId); + if (device == null) { + log.debug("Device {} doesn't exist or hasn't been initialized yet", deviceId); + return Collections.emptyList(); + } + + Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); + checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); + + List<DeviceEvent> events = new ArrayList<>(); + synchronized (descsMap) { + DeviceDescriptions descs = descsMap.get(providerId); + // every provider must provide DeviceDescription. + checkArgument(descs != null, + "Device description for Device ID %s from Provider %s was not found", + deviceId, providerId); + + Map<PortNumber, Port> ports = getPortMap(deviceId); + + // Add new ports + Set<PortNumber> processed = new HashSet<>(); + for (PortDescription portDescription : portDescriptions) { + final PortNumber number = portDescription.portNumber(); + processed.add(portDescription.portNumber()); + + final Port oldPort = ports.get(number); + final Port newPort; + +// event suppression hook? + + // update description + descs.putPortDesc(portDescription); + newPort = composePort(device, number, descsMap); + + events.add(oldPort == null ? + createPort(device, newPort, ports) : + updatePort(device, oldPort, newPort, ports)); + } + + events.addAll(pruneOldPorts(device, ports, processed)); + } + return FluentIterable.from(events).filter(notNull()).toList(); + } + + // Creates a new port based on the port description adds it to the map and + // Returns corresponding event. + // Guarded by deviceDescs value (=Device lock) + private DeviceEvent createPort(Device device, Port newPort, + Map<PortNumber, Port> ports) { + ports.put(newPort.number(), newPort); + return new DeviceEvent(PORT_ADDED, device, newPort); + } + + // Checks if the specified port requires update and if so, it replaces the + // existing entry in the map and returns corresponding event. + // Guarded by deviceDescs value (=Device lock) + private DeviceEvent updatePort(Device device, Port oldPort, + Port newPort, + Map<PortNumber, Port> ports) { + if (oldPort.isEnabled() != newPort.isEnabled() || + oldPort.type() != newPort.type() || + oldPort.portSpeed() != newPort.portSpeed() || + !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) { + ports.put(oldPort.number(), newPort); + return new DeviceEvent(PORT_UPDATED, device, newPort); + } + return null; + } + + // Prunes the specified list of ports based on which ports are in the + // processed list and returns list of corresponding events. + // Guarded by deviceDescs value (=Device lock) + private List<DeviceEvent> pruneOldPorts(Device device, + Map<PortNumber, Port> ports, + Set<PortNumber> processed) { + List<DeviceEvent> events = new ArrayList<>(); + Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<PortNumber, Port> e = iterator.next(); + PortNumber portNumber = e.getKey(); + if (!processed.contains(portNumber)) { + events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue())); + iterator.remove(); + } + } + return events; + } + + // Gets the map of ports for the specified device; if one does not already + // exist, it creates and registers a new one. + private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) { + return createIfAbsentUnchecked(devicePorts, deviceId, + NewConcurrentHashMap.<PortNumber, Port>ifNeeded()); + } + + private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptions( + DeviceId deviceId) { + Map<ProviderId, DeviceDescriptions> r; + r = deviceDescs.get(deviceId); + if (r != null) { + return r; + } + r = new HashMap<>(); + final Map<ProviderId, DeviceDescriptions> concurrentlyAdded; + concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r); + if (concurrentlyAdded != null) { + return concurrentlyAdded; + } else { + return r; + } + } + + // Guarded by deviceDescs value (=Device lock) + private DeviceDescriptions getOrCreateProviderDeviceDescriptions( + Map<ProviderId, DeviceDescriptions> device, + ProviderId providerId, DeviceDescription deltaDesc) { + synchronized (device) { + DeviceDescriptions r = device.get(providerId); + if (r == null) { + r = new DeviceDescriptions(deltaDesc); + device.put(providerId, r); + } + return r; + } + } + + @Override + public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId, + PortDescription portDescription) { + Device device = devices.get(deviceId); + checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); + + Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); + checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); + + synchronized (descsMap) { + DeviceDescriptions descs = descsMap.get(providerId); + // assuming all providers must give DeviceDescription first + checkArgument(descs != null, + "Device description for Device ID %s from Provider %s was not found", + deviceId, providerId); + + ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); + final PortNumber number = portDescription.portNumber(); + final Port oldPort = ports.get(number); + final Port newPort; + + // update description + descs.putPortDesc(portDescription); + newPort = composePort(device, number, descsMap); + + if (oldPort == null) { + return createPort(device, newPort, ports); + } else { + return updatePort(device, oldPort, newPort, ports); + } + } + } + + @Override + public List<Port> getPorts(DeviceId deviceId) { + Map<PortNumber, Port> ports = devicePorts.get(deviceId); + if (ports == null) { + return Collections.emptyList(); + } + return ImmutableList.copyOf(ports.values()); + } + + @Override + public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId, + Collection<PortStatistics> newStatsCollection) { + + ConcurrentMap<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId); + ConcurrentMap<PortNumber, PortStatistics> newStatsMap = Maps.newConcurrentMap(); + ConcurrentMap<PortNumber, PortStatistics> deltaStatsMap = Maps.newConcurrentMap(); + + if (prvStatsMap != null) { + for (PortStatistics newStats : newStatsCollection) { + PortNumber port = PortNumber.portNumber(newStats.port()); + PortStatistics prvStats = prvStatsMap.get(port); + DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); + PortStatistics deltaStats = builder.build(); + if (prvStats != null) { + deltaStats = calcDeltaStats(deviceId, prvStats, newStats); + } + deltaStatsMap.put(port, deltaStats); + newStatsMap.put(port, newStats); + } + } else { + for (PortStatistics newStats : newStatsCollection) { + PortNumber port = PortNumber.portNumber(newStats.port()); + newStatsMap.put(port, newStats); + } + } + devicePortDeltaStats.put(deviceId, deltaStatsMap); + devicePortStats.put(deviceId, newStatsMap); + return new DeviceEvent(PORT_STATS_UPDATED, devices.get(deviceId), null); + } + + public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) { + // calculate time difference + long deltaStatsSec, deltaStatsNano; + if (newStats.durationNano() < prvStats.durationNano()) { + deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1); + deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L; + } else { + deltaStatsNano = newStats.durationNano() - prvStats.durationNano(); + deltaStatsSec = newStats.durationSec() - prvStats.durationSec(); + } + DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); + DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId) + .setPort(newStats.port()) + .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived()) + .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent()) + .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived()) + .setBytesSent(newStats.bytesSent() - prvStats.bytesSent()) + .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped()) + .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped()) + .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors()) + .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors()) + .setDurationSec(deltaStatsSec) + .setDurationNano(deltaStatsNano) + .build(); + return deltaStats; + } + + @Override + public Port getPort(DeviceId deviceId, PortNumber portNumber) { + Map<PortNumber, Port> ports = devicePorts.get(deviceId); + return ports == null ? null : ports.get(portNumber); + } + + @Override + public List<PortStatistics> getPortStatistics(DeviceId deviceId) { + Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId); + if (portStats == null) { + return Collections.emptyList(); + } + return ImmutableList.copyOf(portStats.values()); + } + + @Override + public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) { + Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId); + if (portStats == null) { + return Collections.emptyList(); + } + return ImmutableList.copyOf(portStats.values()); + } + + @Override + public boolean isAvailable(DeviceId deviceId) { + return availableDevices.contains(deviceId); + } + + @Override + public DeviceEvent removeDevice(DeviceId deviceId) { + Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptions(deviceId); + synchronized (descs) { + Device device = devices.remove(deviceId); + // should DEVICE_REMOVED carry removed ports? + Map<PortNumber, Port> ports = devicePorts.get(deviceId); + if (ports != null) { + ports.clear(); + } + availableDevices.remove(deviceId); + descs.clear(); + return device == null ? null : + new DeviceEvent(DEVICE_REMOVED, device, null); + } + } + + /** + * Returns a Device, merging description given from multiple Providers. + * + * @param deviceId device identifier + * @param providerDescs Collection of Descriptions from multiple providers + * @return Device instance + */ + private Device composeDevice(DeviceId deviceId, + Map<ProviderId, DeviceDescriptions> providerDescs) { + + checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied"); + + ProviderId primary = pickPrimaryPID(providerDescs); + + DeviceDescriptions desc = providerDescs.get(primary); + + final DeviceDescription base = desc.getDeviceDesc(); + Type type = base.type(); + String manufacturer = base.manufacturer(); + String hwVersion = base.hwVersion(); + String swVersion = base.swVersion(); + String serialNumber = base.serialNumber(); + ChassisId chassisId = base.chassisId(); + DefaultAnnotations annotations = DefaultAnnotations.builder().build(); + annotations = merge(annotations, base.annotations()); + + for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { + if (e.getKey().equals(primary)) { + continue; + } + // TODO: should keep track of Description timestamp + // and only merge conflicting keys when timestamp is newer + // Currently assuming there will never be a key conflict between + // providers + + // annotation merging. not so efficient, should revisit later + annotations = merge(annotations, e.getValue().getDeviceDesc().annotations()); + } + + return new DefaultDevice(primary, deviceId, type, manufacturer, + hwVersion, swVersion, serialNumber, + chassisId, annotations); + } + + /** + * Returns a Port, merging description given from multiple Providers. + * + * @param device device the port is on + * @param number port number + * @param descsMap Collection of Descriptions from multiple providers + * @return Port instance + */ + private Port composePort(Device device, PortNumber number, + Map<ProviderId, DeviceDescriptions> descsMap) { + + ProviderId primary = pickPrimaryPID(descsMap); + DeviceDescriptions primDescs = descsMap.get(primary); + // if no primary, assume not enabled + // TODO: revisit this default port enabled/disabled behavior + boolean isEnabled = false; + DefaultAnnotations annotations = DefaultAnnotations.builder().build(); + + final PortDescription portDesc = primDescs.getPortDesc(number); + if (portDesc != null) { + isEnabled = portDesc.isEnabled(); + annotations = merge(annotations, portDesc.annotations()); + } + + for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) { + if (e.getKey().equals(primary)) { + continue; + } + // TODO: should keep track of Description timestamp + // and only merge conflicting keys when timestamp is newer + // Currently assuming there will never be a key conflict between + // providers + + // annotation merging. not so efficient, should revisit later + final PortDescription otherPortDesc = e.getValue().getPortDesc(number); + if (otherPortDesc != null) { + annotations = merge(annotations, otherPortDesc.annotations()); + } + } + + return portDesc == null ? + new DefaultPort(device, number, false, annotations) : + new DefaultPort(device, number, isEnabled, portDesc.type(), + portDesc.portSpeed(), annotations); + } + + /** + * @return primary ProviderID, or randomly chosen one if none exists + */ + private ProviderId pickPrimaryPID(Map<ProviderId, DeviceDescriptions> descsMap) { + ProviderId fallBackPrimary = null; + for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) { + if (!e.getKey().isAncillary()) { + return e.getKey(); + } else if (fallBackPrimary == null) { + // pick randomly as a fallback in case there is no primary + fallBackPrimary = e.getKey(); + } + } + return fallBackPrimary; + } + + /** + * Collection of Description of a Device and it's Ports given from a Provider. + */ + private static class DeviceDescriptions { + + private final AtomicReference<DeviceDescription> deviceDesc; + private final ConcurrentMap<PortNumber, PortDescription> portDescs; + + public DeviceDescriptions(DeviceDescription desc) { + this.deviceDesc = new AtomicReference<>(checkNotNull(desc)); + this.portDescs = new ConcurrentHashMap<>(); + } + + public DeviceDescription getDeviceDesc() { + return deviceDesc.get(); + } + + public PortDescription getPortDesc(PortNumber number) { + return portDescs.get(number); + } + + /** + * Puts DeviceDescription, merging annotations as necessary. + * + * @param newDesc new DeviceDescription + * @return previous DeviceDescription + */ + public synchronized DeviceDescription putDeviceDesc(DeviceDescription newDesc) { + DeviceDescription oldOne = deviceDesc.get(); + DeviceDescription newOne = newDesc; + if (oldOne != null) { + SparseAnnotations merged = union(oldOne.annotations(), + newDesc.annotations()); + newOne = new DefaultDeviceDescription(newOne, merged); + } + return deviceDesc.getAndSet(newOne); + } + + /** + * Puts PortDescription, merging annotations as necessary. + * + * @param newDesc new PortDescription + * @return previous PortDescription + */ + public synchronized PortDescription putPortDesc(PortDescription newDesc) { + PortDescription oldOne = portDescs.get(newDesc.portNumber()); + PortDescription newOne = newDesc; + if (oldOne != null) { + SparseAnnotations merged = union(oldOne.annotations(), + newDesc.annotations()); + newOne = new DefaultPortDescription(newOne, merged); + } + return portDescs.put(newOne.portNumber(), newOne); + } + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleDeviceStoreTest.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleDeviceStoreTest.java new file mode 100644 index 00000000..562e6f3c --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleDeviceStoreTest.java @@ -0,0 +1,530 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.onosproject.store.trivial; + +import static org.junit.Assert.*; +import static org.onosproject.net.Device.Type.SWITCH; +import static org.onosproject.net.DeviceId.deviceId; +import static org.onosproject.net.device.DeviceEvent.Type.*; +import static org.onosproject.net.NetTestTools.assertAnnotationsEquals; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.Device; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Port; +import org.onosproject.net.PortNumber; +import org.onosproject.net.SparseAnnotations; +import org.onosproject.net.device.DefaultDeviceDescription; +import org.onosproject.net.device.DefaultPortDescription; +import org.onosproject.net.device.DeviceDescription; +import org.onosproject.net.device.DeviceEvent; +import org.onosproject.net.device.DeviceStore; +import org.onosproject.net.device.DeviceStoreDelegate; +import org.onosproject.net.device.PortDescription; +import org.onosproject.net.provider.ProviderId; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.onlab.packet.ChassisId; + +/** + * Test of the simple DeviceStore implementation. + */ +public class SimpleDeviceStoreTest { + + private static final ProviderId PID = new ProviderId("of", "foo"); + private static final ProviderId PIDA = new ProviderId("of", "bar", true); + private static final DeviceId DID1 = deviceId("of:foo"); + private static final DeviceId DID2 = deviceId("of:bar"); + private static final String MFR = "whitebox"; + private static final String HW = "1.1.x"; + private static final String SW1 = "3.8.1"; + private static final String SW2 = "3.9.5"; + private static final String SN = "43311-12345"; + private static final ChassisId CID = new ChassisId(); + + private static final PortNumber P1 = PortNumber.portNumber(1); + private static final PortNumber P2 = PortNumber.portNumber(2); + private static final PortNumber P3 = PortNumber.portNumber(3); + + private static final SparseAnnotations A1 = DefaultAnnotations.builder() + .set("A1", "a1") + .set("B1", "b1") + .build(); + private static final SparseAnnotations A1_2 = DefaultAnnotations.builder() + .remove("A1") + .set("B3", "b3") + .build(); + private static final SparseAnnotations A2 = DefaultAnnotations.builder() + .set("A2", "a2") + .set("B2", "b2") + .build(); + private static final SparseAnnotations A2_2 = DefaultAnnotations.builder() + .remove("A2") + .set("B4", "b4") + .build(); + + private SimpleDeviceStore simpleDeviceStore; + private DeviceStore deviceStore; + + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + + @Before + public void setUp() throws Exception { + simpleDeviceStore = new SimpleDeviceStore(); + simpleDeviceStore.activate(); + deviceStore = simpleDeviceStore; + } + + @After + public void tearDown() throws Exception { + simpleDeviceStore.deactivate(); + } + + private void putDevice(DeviceId deviceId, String swVersion, + SparseAnnotations... annotations) { + DeviceDescription description = + new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR, + HW, swVersion, SN, CID, annotations); + deviceStore.createOrUpdateDevice(PID, deviceId, description); + } + + private void putDeviceAncillary(DeviceId deviceId, String swVersion, + SparseAnnotations... annotations) { + DeviceDescription description = + new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR, + HW, swVersion, SN, CID, annotations); + deviceStore.createOrUpdateDevice(PIDA, deviceId, description); + } + + private static void assertDevice(DeviceId id, String swVersion, Device device) { + assertNotNull(device); + assertEquals(id, device.id()); + assertEquals(MFR, device.manufacturer()); + assertEquals(HW, device.hwVersion()); + assertEquals(swVersion, device.swVersion()); + assertEquals(SN, device.serialNumber()); + } + + @Test + public final void testGetDeviceCount() { + assertEquals("initialy empty", 0, deviceStore.getDeviceCount()); + + putDevice(DID1, SW1); + putDevice(DID2, SW2); + putDevice(DID1, SW1); + + assertEquals("expect 2 uniq devices", 2, deviceStore.getDeviceCount()); + } + + @Test + public final void testGetDevices() { + assertEquals("initialy empty", 0, Iterables.size(deviceStore.getDevices())); + + putDevice(DID1, SW1); + putDevice(DID2, SW2); + putDevice(DID1, SW1); + + assertEquals("expect 2 uniq devices", + 2, Iterables.size(deviceStore.getDevices())); + + Map<DeviceId, Device> devices = new HashMap<>(); + for (Device device : deviceStore.getDevices()) { + devices.put(device.id(), device); + } + + assertDevice(DID1, SW1, devices.get(DID1)); + assertDevice(DID2, SW2, devices.get(DID2)); + + // add case for new node? + } + + @Test + public final void testGetDevice() { + + putDevice(DID1, SW1); + + assertDevice(DID1, SW1, deviceStore.getDevice(DID1)); + assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2)); + } + + @Test + public final void testCreateOrUpdateDevice() { + DeviceDescription description = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW1, SN, CID); + DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description); + assertEquals(DEVICE_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + + DeviceDescription description2 = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW2, SN, CID); + DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2); + assertEquals(DEVICE_UPDATED, event2.type()); + assertDevice(DID1, SW2, event2.subject()); + + assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2)); + } + + @Test + public final void testCreateOrUpdateDeviceAncillary() { + DeviceDescription description = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW1, SN, CID, A2); + DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description); + assertEquals(DEVICE_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertEquals(PIDA, event.subject().providerId()); + assertAnnotationsEquals(event.subject().annotations(), A2); + assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1)); + + DeviceDescription description2 = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW2, SN, CID, A1); + DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2); + assertEquals(DEVICE_UPDATED, event2.type()); + assertDevice(DID1, SW2, event2.subject()); + assertEquals(PID, event2.subject().providerId()); + assertAnnotationsEquals(event2.subject().annotations(), A1, A2); + assertTrue(deviceStore.isAvailable(DID1)); + + assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2)); + + // For now, Ancillary is ignored once primary appears + assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description)); + + // But, Ancillary annotations will be in effect + DeviceDescription description3 = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW1, SN, CID, A2_2); + DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3); + assertEquals(DEVICE_UPDATED, event3.type()); + // basic information will be the one from Primary + assertDevice(DID1, SW2, event3.subject()); + assertEquals(PID, event3.subject().providerId()); + // but annotation from Ancillary will be merged + assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2); + assertTrue(deviceStore.isAvailable(DID1)); + } + + + @Test + public final void testMarkOffline() { + + putDevice(DID1, SW1); + assertTrue(deviceStore.isAvailable(DID1)); + + DeviceEvent event = deviceStore.markOffline(DID1); + assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertFalse(deviceStore.isAvailable(DID1)); + + DeviceEvent event2 = deviceStore.markOffline(DID1); + assertNull("No change, no event", event2); +} + + @Test + public final void testUpdatePorts() { + putDevice(DID1, SW1); + List<PortDescription> pds = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, true), + new DefaultPortDescription(P2, true) + ); + + List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds); + + Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2); + for (DeviceEvent event : events) { + assertEquals(PORT_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertTrue("PortNumber is one of expected", + expectedPorts.remove(event.port().number())); + assertTrue("Port is enabled", event.port().isEnabled()); + } + assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty()); + + + List<PortDescription> pds2 = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, false), + new DefaultPortDescription(P2, true), + new DefaultPortDescription(P3, true) + ); + + events = deviceStore.updatePorts(PID, DID1, pds2); + assertFalse("event should be triggered", events.isEmpty()); + for (DeviceEvent event : events) { + PortNumber num = event.port().number(); + if (P1.equals(num)) { + assertEquals(PORT_UPDATED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertFalse("Port is disabled", event.port().isEnabled()); + } else if (P2.equals(num)) { + fail("P2 event not expected."); + } else if (P3.equals(num)) { + assertEquals(PORT_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertTrue("Port is enabled", event.port().isEnabled()); + } else { + fail("Unknown port number encountered: " + num); + } + } + + List<PortDescription> pds3 = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, false), + new DefaultPortDescription(P2, true) + ); + events = deviceStore.updatePorts(PID, DID1, pds3); + assertFalse("event should be triggered", events.isEmpty()); + for (DeviceEvent event : events) { + PortNumber num = event.port().number(); + if (P1.equals(num)) { + fail("P1 event not expected."); + } else if (P2.equals(num)) { + fail("P2 event not expected."); + } else if (P3.equals(num)) { + assertEquals(PORT_REMOVED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertTrue("Port was enabled", event.port().isEnabled()); + } else { + fail("Unknown port number encountered: " + num); + } + } + + } + + @Test + public final void testUpdatePortStatus() { + putDevice(DID1, SW1); + List<PortDescription> pds = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, true) + ); + deviceStore.updatePorts(PID, DID1, pds); + + DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, + new DefaultPortDescription(P1, false)); + assertEquals(PORT_UPDATED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertEquals(P1, event.port().number()); + assertFalse("Port is disabled", event.port().isEnabled()); + + } + + @Test + public final void testUpdatePortStatusAncillary() { + putDeviceAncillary(DID1, SW1); + putDevice(DID1, SW1); + List<PortDescription> pds = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, true, A1) + ); + deviceStore.updatePorts(PID, DID1, pds); + + DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, + new DefaultPortDescription(P1, false, A1_2)); + assertEquals(PORT_UPDATED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertEquals(P1, event.port().number()); + assertAnnotationsEquals(event.port().annotations(), A1, A1_2); + assertFalse("Port is disabled", event.port().isEnabled()); + + DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, + new DefaultPortDescription(P1, true)); + assertNull("Ancillary is ignored if primary exists", event2); + + // but, Ancillary annotation update will be notified + DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, + new DefaultPortDescription(P1, true, A2)); + assertEquals(PORT_UPDATED, event3.type()); + assertDevice(DID1, SW1, event3.subject()); + assertEquals(P1, event3.port().number()); + assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2); + assertFalse("Port is disabled", event3.port().isEnabled()); + + // port only reported from Ancillary will be notified as down + DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, + new DefaultPortDescription(P2, true)); + assertEquals(PORT_ADDED, event4.type()); + assertDevice(DID1, SW1, event4.subject()); + assertEquals(P2, event4.port().number()); + assertAnnotationsEquals(event4.port().annotations()); + assertFalse("Port is disabled if not given from primary provider", + event4.port().isEnabled()); + } + + @Test + public final void testGetPorts() { + putDevice(DID1, SW1); + putDevice(DID2, SW1); + List<PortDescription> pds = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, true), + new DefaultPortDescription(P2, true) + ); + deviceStore.updatePorts(PID, DID1, pds); + + Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2); + List<Port> ports = deviceStore.getPorts(DID1); + for (Port port : ports) { + assertTrue("Port is enabled", port.isEnabled()); + assertTrue("PortNumber is one of expected", + expectedPorts.remove(port.number())); + } + assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty()); + + + assertTrue("DID2 has no ports", deviceStore.getPorts(DID2).isEmpty()); + } + + @Test + public final void testGetPort() { + putDevice(DID1, SW1); + putDevice(DID2, SW1); + List<PortDescription> pds = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, true), + new DefaultPortDescription(P2, false) + ); + deviceStore.updatePorts(PID, DID1, pds); + + Port port1 = deviceStore.getPort(DID1, P1); + assertEquals(P1, port1.number()); + assertTrue("Port is enabled", port1.isEnabled()); + + Port port2 = deviceStore.getPort(DID1, P2); + assertEquals(P2, port2.number()); + assertFalse("Port is disabled", port2.isEnabled()); + + Port port3 = deviceStore.getPort(DID1, P3); + assertNull("P3 not expected", port3); + } + + @Test + public final void testRemoveDevice() { + putDevice(DID1, SW1, A1); + List<PortDescription> pds = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, true, A2) + ); + deviceStore.updatePorts(PID, DID1, pds); + putDevice(DID2, SW1); + + assertEquals(2, deviceStore.getDeviceCount()); + assertEquals(1, deviceStore.getPorts(DID1).size()); + assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1); + assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2); + + DeviceEvent event = deviceStore.removeDevice(DID1); + assertEquals(DEVICE_REMOVED, event.type()); + assertDevice(DID1, SW1, event.subject()); + + assertEquals(1, deviceStore.getDeviceCount()); + assertEquals(0, deviceStore.getPorts(DID1).size()); + + // putBack Device, Port w/o annotation + putDevice(DID1, SW1); + List<PortDescription> pds2 = Arrays.<PortDescription>asList( + new DefaultPortDescription(P1, true) + ); + deviceStore.updatePorts(PID, DID1, pds2); + + // annotations should not survive + assertEquals(2, deviceStore.getDeviceCount()); + assertEquals(1, deviceStore.getPorts(DID1).size()); + assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations()); + assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations()); + } + + // If Delegates should be called only on remote events, + // then Simple* should never call them, thus not test required. + // TODO add test for Port events when we have them + @Ignore("Ignore until Delegate spec. is clear.") + @Test + public final void testEvents() throws InterruptedException { + final CountDownLatch addLatch = new CountDownLatch(1); + DeviceStoreDelegate checkAdd = new DeviceStoreDelegate() { + @Override + public void notify(DeviceEvent event) { + assertEquals(DEVICE_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + addLatch.countDown(); + } + }; + final CountDownLatch updateLatch = new CountDownLatch(1); + DeviceStoreDelegate checkUpdate = new DeviceStoreDelegate() { + @Override + public void notify(DeviceEvent event) { + assertEquals(DEVICE_UPDATED, event.type()); + assertDevice(DID1, SW2, event.subject()); + updateLatch.countDown(); + } + }; + final CountDownLatch removeLatch = new CountDownLatch(1); + DeviceStoreDelegate checkRemove = new DeviceStoreDelegate() { + @Override + public void notify(DeviceEvent event) { + assertEquals(DEVICE_REMOVED, event.type()); + assertDevice(DID1, SW2, event.subject()); + removeLatch.countDown(); + } + }; + + DeviceDescription description = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW1, SN, CID); + deviceStore.setDelegate(checkAdd); + deviceStore.createOrUpdateDevice(PID, DID1, description); + assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS)); + + + DeviceDescription description2 = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW2, SN, CID); + deviceStore.unsetDelegate(checkAdd); + deviceStore.setDelegate(checkUpdate); + deviceStore.createOrUpdateDevice(PID, DID1, description2); + assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS)); + + deviceStore.unsetDelegate(checkUpdate); + deviceStore.setDelegate(checkRemove); + deviceStore.removeDevice(DID1); + assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS)); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java new file mode 100644 index 00000000..3b8f1d35 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java @@ -0,0 +1,327 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.base.Function; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.NewConcurrentHashMap; +import org.onosproject.net.DeviceId; +import org.onosproject.net.flow.CompletedBatchOperation; +import org.onosproject.net.flow.DefaultFlowEntry; +import org.onosproject.net.flow.FlowEntry; +import org.onosproject.net.flow.FlowEntry.FlowEntryState; +import org.onosproject.net.flow.FlowId; +import org.onosproject.net.flow.FlowRule; +import org.onosproject.net.flow.FlowRuleBatchEntry; +import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation; +import org.onosproject.net.flow.FlowRuleBatchEvent; +import org.onosproject.net.flow.FlowRuleBatchOperation; +import org.onosproject.net.flow.FlowRuleBatchRequest; +import org.onosproject.net.flow.FlowRuleEvent; +import org.onosproject.net.flow.FlowRuleEvent.Type; +import org.onosproject.net.flow.FlowRuleStore; +import org.onosproject.net.flow.FlowRuleStoreDelegate; +import org.onosproject.net.flow.StoredFlowEntry; +import org.onosproject.store.AbstractStore; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; +import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages inventory of flow rules using trivial in-memory implementation. + */ +@Component(immediate = true) +@Service +public class SimpleFlowRuleStore + extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate> + implements FlowRuleStore { + + private final Logger log = getLogger(getClass()); + + + // inner Map is Device flow table + // inner Map value (FlowId synonym list) must be synchronized before modifying + private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>> + flowEntries = new ConcurrentHashMap<>(); + + private final AtomicInteger localBatchIdGen = new AtomicInteger(); + + // TODO: make this configurable + private int pendingFutureTimeoutMinutes = 5; + + private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = + CacheBuilder.newBuilder() + .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES) + .removalListener(new TimeoutFuture()) + .build(); + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + flowEntries.clear(); + log.info("Stopped"); + } + + + @Override + public int getFlowRuleCount() { + int sum = 0; + for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft : flowEntries.values()) { + for (List<StoredFlowEntry> fes : ft.values()) { + sum += fes.size(); + } + } + return sum; + } + + private static NewConcurrentHashMap<FlowId, List<StoredFlowEntry>> lazyEmptyFlowTable() { + return NewConcurrentHashMap.<FlowId, List<StoredFlowEntry>>ifNeeded(); + } + + /** + * Returns the flow table for specified device. + * + * @param deviceId identifier of the device + * @return Map representing Flow Table of given device. + */ + private ConcurrentMap<FlowId, List<StoredFlowEntry>> getFlowTable(DeviceId deviceId) { + return createIfAbsentUnchecked(flowEntries, + deviceId, lazyEmptyFlowTable()); + } + + private List<StoredFlowEntry> getFlowEntries(DeviceId deviceId, FlowId flowId) { + final ConcurrentMap<FlowId, List<StoredFlowEntry>> flowTable = getFlowTable(deviceId); + List<StoredFlowEntry> r = flowTable.get(flowId); + if (r == null) { + final List<StoredFlowEntry> concurrentlyAdded; + r = new CopyOnWriteArrayList<>(); + concurrentlyAdded = flowTable.putIfAbsent(flowId, r); + if (concurrentlyAdded != null) { + return concurrentlyAdded; + } + } + return r; + } + + private FlowEntry getFlowEntryInternal(DeviceId deviceId, FlowRule rule) { + List<StoredFlowEntry> fes = getFlowEntries(deviceId, rule.id()); + for (StoredFlowEntry fe : fes) { + if (fe.equals(rule)) { + return fe; + } + } + return null; + } + + @Override + public FlowEntry getFlowEntry(FlowRule rule) { + return getFlowEntryInternal(rule.deviceId(), rule); + } + + @Override + public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) { + // flatten and make iterator unmodifiable + return FluentIterable.from(getFlowTable(deviceId).values()) + .transformAndConcat( + new Function<List<StoredFlowEntry>, Iterable<? extends FlowEntry>>() { + + @Override + public Iterable<? extends FlowEntry> apply( + List<StoredFlowEntry> input) { + return Collections.unmodifiableList(input); + } + }); + } + + @Override + public void storeFlowRule(FlowRule rule) { + storeFlowRuleInternal(rule); + } + + private void storeFlowRuleInternal(FlowRule rule) { + StoredFlowEntry f = new DefaultFlowEntry(rule); + final DeviceId did = f.deviceId(); + final FlowId fid = f.id(); + List<StoredFlowEntry> existing = getFlowEntries(did, fid); + synchronized (existing) { + for (StoredFlowEntry fe : existing) { + if (fe.equals(rule)) { + // was already there? ignore + return; + } + } + // new flow rule added + existing.add(f); + } + } + + @Override + public void deleteFlowRule(FlowRule rule) { + + List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id()); + + synchronized (entries) { + for (StoredFlowEntry entry : entries) { + if (entry.equals(rule)) { + synchronized (entry) { + entry.setState(FlowEntryState.PENDING_REMOVE); + } + } + } + } + + + //log.warn("Cannot find rule {}", rule); + } + + @Override + public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) { + // check if this new rule is an update to an existing entry + List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id()); + synchronized (entries) { + for (StoredFlowEntry stored : entries) { + if (stored.equals(rule)) { + synchronized (stored) { + stored.setBytes(rule.bytes()); + stored.setLife(rule.life()); + stored.setPackets(rule.packets()); + if (stored.state() == FlowEntryState.PENDING_ADD) { + stored.setState(FlowEntryState.ADDED); + // TODO: Do we need to change `rule` state? + return new FlowRuleEvent(Type.RULE_ADDED, rule); + } + return new FlowRuleEvent(Type.RULE_UPDATED, rule); + } + } + } + } + + // should not reach here + // storeFlowRule was expected to be called + log.error("FlowRule was not found in store {} to update", rule); + + //flowEntries.put(did, rule); + return null; + } + + @Override + public FlowRuleEvent removeFlowRule(FlowEntry rule) { + // This is where one could mark a rule as removed and still keep it in the store. + final DeviceId did = rule.deviceId(); + + List<StoredFlowEntry> entries = getFlowEntries(did, rule.id()); + synchronized (entries) { + if (entries.remove(rule)) { + return new FlowRuleEvent(RULE_REMOVED, rule); + } + } + return null; + } + + @Override + public void storeBatch( + FlowRuleBatchOperation operation) { + List<FlowRuleBatchEntry> toAdd = new ArrayList<>(); + List<FlowRuleBatchEntry> toRemove = new ArrayList<>(); + + for (FlowRuleBatchEntry entry : operation.getOperations()) { + final FlowRule flowRule = entry.target(); + if (entry.operator().equals(FlowRuleOperation.ADD)) { + if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) { + storeFlowRule(flowRule); + toAdd.add(entry); + } + } else if (entry.operator().equals(FlowRuleOperation.REMOVE)) { + if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) { + deleteFlowRule(flowRule); + toRemove.add(entry); + } + } else { + throw new UnsupportedOperationException("Unsupported operation type"); + } + } + + if (toAdd.isEmpty() && toRemove.isEmpty()) { + notifyDelegate(FlowRuleBatchEvent.completed( + new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), + new CompletedBatchOperation(true, Collections.emptySet(), + operation.deviceId()))); + return; + } + + SettableFuture<CompletedBatchOperation> r = SettableFuture.create(); + final int batchId = localBatchIdGen.incrementAndGet(); + + pendingFutures.put(batchId, r); + + toAdd.addAll(toRemove); + notifyDelegate(FlowRuleBatchEvent.requested( + new FlowRuleBatchRequest(batchId, Sets.newHashSet(toAdd)), operation.deviceId())); + + } + + @Override + public void batchOperationComplete(FlowRuleBatchEvent event) { + final Long batchId = event.subject().batchId(); + SettableFuture<CompletedBatchOperation> future + = pendingFutures.getIfPresent(batchId); + if (future != null) { + future.set(event.result()); + pendingFutures.invalidate(batchId); + } + notifyDelegate(event); + } + + private static final class TimeoutFuture + implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> { + @Override + public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) { + // wrapping in ExecutionException to support Future.get + if (notification.wasEvicted()) { + notification.getValue() + .setException(new ExecutionException("Timed out", + new TimeoutException())); + } + } + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java new file mode 100644 index 00000000..71de3e13 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java @@ -0,0 +1,717 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.NewConcurrentHashMap; +import org.onosproject.core.DefaultGroupId; +import org.onosproject.core.GroupId; +import org.onosproject.net.DeviceId; +import org.onosproject.net.group.DefaultGroup; +import org.onosproject.net.group.DefaultGroupDescription; +import org.onosproject.net.group.Group; +import org.onosproject.net.group.Group.GroupState; +import org.onosproject.net.group.GroupBucket; +import org.onosproject.net.group.GroupBuckets; +import org.onosproject.net.group.GroupDescription; +import org.onosproject.net.group.GroupEvent; +import org.onosproject.net.group.GroupEvent.Type; +import org.onosproject.net.group.GroupKey; +import org.onosproject.net.group.GroupOperation; +import org.onosproject.net.group.GroupStore; +import org.onosproject.net.group.GroupStoreDelegate; +import org.onosproject.net.group.StoredGroupBucketEntry; +import org.onosproject.net.group.StoredGroupEntry; +import org.onosproject.store.AbstractStore; +import org.slf4j.Logger; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Sets; + +/** + * Manages inventory of group entries using trivial in-memory implementation. + */ +@Component(immediate = true) +@Service +public class SimpleGroupStore + extends AbstractStore<GroupEvent, GroupStoreDelegate> + implements GroupStore { + + private final Logger log = getLogger(getClass()); + + private final int dummyId = 0xffffffff; + private final GroupId dummyGroupId = new DefaultGroupId(dummyId); + + // inner Map is per device group table + private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>> + groupEntriesByKey = new ConcurrentHashMap<>(); + private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>> + groupEntriesById = new ConcurrentHashMap<>(); + private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>> + pendingGroupEntriesByKey = new ConcurrentHashMap<>(); + private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>> + extraneousGroupEntriesById = new ConcurrentHashMap<>(); + + private final HashMap<DeviceId, Boolean> deviceAuditStatus = + new HashMap<DeviceId, Boolean>(); + + private final AtomicInteger groupIdGen = new AtomicInteger(); + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + groupEntriesByKey.clear(); + groupEntriesById.clear(); + log.info("Stopped"); + } + + private static NewConcurrentHashMap<GroupKey, StoredGroupEntry> + lazyEmptyGroupKeyTable() { + return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded(); + } + + private static NewConcurrentHashMap<GroupId, StoredGroupEntry> + lazyEmptyGroupIdTable() { + return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded(); + } + + private static NewConcurrentHashMap<GroupKey, StoredGroupEntry> + lazyEmptyPendingGroupKeyTable() { + return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded(); + } + + private static NewConcurrentHashMap<GroupId, Group> + lazyEmptyExtraneousGroupIdTable() { + return NewConcurrentHashMap.<GroupId, Group>ifNeeded(); + } + + /** + * Returns the group key table for specified device. + * + * @param deviceId identifier of the device + * @return Map representing group key table of given device. + */ + private ConcurrentMap<GroupKey, StoredGroupEntry> getGroupKeyTable(DeviceId deviceId) { + return createIfAbsentUnchecked(groupEntriesByKey, + deviceId, lazyEmptyGroupKeyTable()); + } + + /** + * Returns the group id table for specified device. + * + * @param deviceId identifier of the device + * @return Map representing group key table of given device. + */ + private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) { + return createIfAbsentUnchecked(groupEntriesById, + deviceId, lazyEmptyGroupIdTable()); + } + + /** + * Returns the pending group key table for specified device. + * + * @param deviceId identifier of the device + * @return Map representing group key table of given device. + */ + private ConcurrentMap<GroupKey, StoredGroupEntry> + getPendingGroupKeyTable(DeviceId deviceId) { + return createIfAbsentUnchecked(pendingGroupEntriesByKey, + deviceId, lazyEmptyPendingGroupKeyTable()); + } + + /** + * Returns the extraneous group id table for specified device. + * + * @param deviceId identifier of the device + * @return Map representing group key table of given device. + */ + private ConcurrentMap<GroupId, Group> + getExtraneousGroupIdTable(DeviceId deviceId) { + return createIfAbsentUnchecked(extraneousGroupEntriesById, + deviceId, + lazyEmptyExtraneousGroupIdTable()); + } + + /** + * Returns the number of groups for the specified device in the store. + * + * @return number of groups for the specified device + */ + @Override + public int getGroupCount(DeviceId deviceId) { + return (groupEntriesByKey.get(deviceId) != null) ? + groupEntriesByKey.get(deviceId).size() : 0; + } + + /** + * Returns the groups associated with a device. + * + * @param deviceId the device ID + * + * @return the group entries + */ + @Override + public Iterable<Group> getGroups(DeviceId deviceId) { + // flatten and make iterator unmodifiable + return FluentIterable.from(getGroupKeyTable(deviceId).values()) + .transform( + new Function<StoredGroupEntry, Group>() { + + @Override + public Group apply( + StoredGroupEntry input) { + return input; + } + }); + } + + /** + * Returns the stored group entry. + * + * @param deviceId the device ID + * @param appCookie the group key + * + * @return a group associated with the key + */ + @Override + public Group getGroup(DeviceId deviceId, GroupKey appCookie) { + return (groupEntriesByKey.get(deviceId) != null) ? + groupEntriesByKey.get(deviceId).get(appCookie) : + null; + } + + @Override + public Group getGroup(DeviceId deviceId, GroupId groupId) { + return (groupEntriesById.get(deviceId) != null) ? + groupEntriesById.get(deviceId).get(groupId) : + null; + } + + private int getFreeGroupIdValue(DeviceId deviceId) { + int freeId = groupIdGen.incrementAndGet(); + + while (true) { + Group existing = ( + groupEntriesById.get(deviceId) != null) ? + groupEntriesById.get(deviceId).get(new DefaultGroupId(freeId)) : + null; + if (existing == null) { + existing = ( + extraneousGroupEntriesById.get(deviceId) != null) ? + extraneousGroupEntriesById.get(deviceId). + get(new DefaultGroupId(freeId)) : + null; + } + if (existing != null) { + freeId = groupIdGen.incrementAndGet(); + } else { + break; + } + } + return freeId; + } + + /** + * Stores a new group entry using the information from group description. + * + * @param groupDesc group description to be used to create group entry + */ + @Override + public void storeGroupDescription(GroupDescription groupDesc) { + // Check if a group is existing with the same key + if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) { + return; + } + + if (deviceAuditStatus.get(groupDesc.deviceId()) == null) { + // Device group audit has not completed yet + // Add this group description to pending group key table + // Create a group entry object with Dummy Group ID + StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc); + group.setState(GroupState.WAITING_AUDIT_COMPLETE); + ConcurrentMap<GroupKey, StoredGroupEntry> pendingKeyTable = + getPendingGroupKeyTable(groupDesc.deviceId()); + pendingKeyTable.put(groupDesc.appCookie(), group); + return; + } + + storeGroupDescriptionInternal(groupDesc); + } + + private void storeGroupDescriptionInternal(GroupDescription groupDesc) { + // Check if a group is existing with the same key + if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) { + return; + } + + GroupId id = null; + if (groupDesc.givenGroupId() == null) { + // Get a new group identifier + id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId())); + } else { + id = new DefaultGroupId(groupDesc.givenGroupId()); + } + // Create a group entry object + StoredGroupEntry group = new DefaultGroup(id, groupDesc); + // Insert the newly created group entry into concurrent key and id maps + ConcurrentMap<GroupKey, StoredGroupEntry> keyTable = + getGroupKeyTable(groupDesc.deviceId()); + keyTable.put(groupDesc.appCookie(), group); + ConcurrentMap<GroupId, StoredGroupEntry> idTable = + getGroupIdTable(groupDesc.deviceId()); + idTable.put(id, group); + notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED, + group)); + } + + /** + * Updates the existing group entry with the information + * from group description. + * + * @param deviceId the device ID + * @param oldAppCookie the current group key + * @param type update type + * @param newBuckets group buckets for updates + * @param newAppCookie optional new group key + */ + @Override + public void updateGroupDescription(DeviceId deviceId, + GroupKey oldAppCookie, + UpdateType type, + GroupBuckets newBuckets, + GroupKey newAppCookie) { + // Check if a group is existing with the provided key + Group oldGroup = getGroup(deviceId, oldAppCookie); + if (oldGroup == null) { + return; + } + + List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup, + type, + newBuckets); + if (newBucketList != null) { + // Create a new group object from the old group + GroupBuckets updatedBuckets = new GroupBuckets(newBucketList); + GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie; + GroupDescription updatedGroupDesc = new DefaultGroupDescription( + oldGroup.deviceId(), + oldGroup.type(), + updatedBuckets, + newCookie, + oldGroup.givenGroupId(), + oldGroup.appId()); + StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(), + updatedGroupDesc); + newGroup.setState(GroupState.PENDING_UPDATE); + newGroup.setLife(oldGroup.life()); + newGroup.setPackets(oldGroup.packets()); + newGroup.setBytes(oldGroup.bytes()); + // Remove the old entry from maps and add new entry using new key + ConcurrentMap<GroupKey, StoredGroupEntry> keyTable = + getGroupKeyTable(oldGroup.deviceId()); + ConcurrentMap<GroupId, StoredGroupEntry> idTable = + getGroupIdTable(oldGroup.deviceId()); + keyTable.remove(oldGroup.appCookie()); + idTable.remove(oldGroup.id()); + keyTable.put(newGroup.appCookie(), newGroup); + idTable.put(newGroup.id(), newGroup); + notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup)); + } + } + + private List<GroupBucket> getUpdatedBucketList(Group oldGroup, + UpdateType type, + GroupBuckets buckets) { + GroupBuckets oldBuckets = oldGroup.buckets(); + List<GroupBucket> newBucketList = new ArrayList<GroupBucket>( + oldBuckets.buckets()); + boolean groupDescUpdated = false; + + if (type == UpdateType.ADD) { + // Check if the any of the new buckets are part of + // the old bucket list + for (GroupBucket addBucket:buckets.buckets()) { + if (!newBucketList.contains(addBucket)) { + newBucketList.add(addBucket); + groupDescUpdated = true; + } + } + } else if (type == UpdateType.REMOVE) { + // Check if the to be removed buckets are part of the + // old bucket list + for (GroupBucket removeBucket:buckets.buckets()) { + if (newBucketList.contains(removeBucket)) { + newBucketList.remove(removeBucket); + groupDescUpdated = true; + } + } + } + + if (groupDescUpdated) { + return newBucketList; + } else { + return null; + } + } + + /** + * Triggers deleting the existing group entry. + * + * @param deviceId the device ID + * @param appCookie the group key + */ + @Override + public void deleteGroupDescription(DeviceId deviceId, + GroupKey appCookie) { + // Check if a group is existing with the provided key + StoredGroupEntry existing = (groupEntriesByKey.get(deviceId) != null) ? + groupEntriesByKey.get(deviceId).get(appCookie) : + null; + if (existing == null) { + return; + } + + synchronized (existing) { + existing.setState(GroupState.PENDING_DELETE); + } + notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing)); + } + + /** + * Stores a new group entry, or updates an existing entry. + * + * @param group group entry + */ + @Override + public void addOrUpdateGroupEntry(Group group) { + // check if this new entry is an update to an existing entry + StoredGroupEntry existing = (groupEntriesById.get( + group.deviceId()) != null) ? + groupEntriesById.get(group.deviceId()).get(group.id()) : + null; + GroupEvent event = null; + + if (existing != null) { + synchronized (existing) { + for (GroupBucket bucket:group.buckets().buckets()) { + Optional<GroupBucket> matchingBucket = + existing.buckets().buckets() + .stream() + .filter((existingBucket)->(existingBucket.equals(bucket))) + .findFirst(); + if (matchingBucket.isPresent()) { + ((StoredGroupBucketEntry) matchingBucket. + get()).setPackets(bucket.packets()); + ((StoredGroupBucketEntry) matchingBucket. + get()).setBytes(bucket.bytes()); + } else { + log.warn("addOrUpdateGroupEntry: No matching " + + "buckets to update stats"); + } + } + existing.setLife(group.life()); + existing.setPackets(group.packets()); + existing.setBytes(group.bytes()); + if (existing.state() == GroupState.PENDING_ADD) { + existing.setState(GroupState.ADDED); + event = new GroupEvent(Type.GROUP_ADDED, existing); + } else { + if (existing.state() == GroupState.PENDING_UPDATE) { + existing.setState(GroupState.ADDED); + } + event = new GroupEvent(Type.GROUP_UPDATED, existing); + } + } + } + + if (event != null) { + notifyDelegate(event); + } + } + + /** + * Removes the group entry from store. + * + * @param group group entry + */ + @Override + public void removeGroupEntry(Group group) { + StoredGroupEntry existing = (groupEntriesById.get( + group.deviceId()) != null) ? + groupEntriesById.get(group.deviceId()).get(group.id()) : + null; + + if (existing != null) { + ConcurrentMap<GroupKey, StoredGroupEntry> keyTable = + getGroupKeyTable(existing.deviceId()); + ConcurrentMap<GroupId, StoredGroupEntry> idTable = + getGroupIdTable(existing.deviceId()); + idTable.remove(existing.id()); + keyTable.remove(existing.appCookie()); + notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing)); + } + } + + @Override + public void deviceInitialAuditCompleted(DeviceId deviceId, + boolean completed) { + synchronized (deviceAuditStatus) { + if (completed) { + log.debug("deviceInitialAuditCompleted: AUDIT " + + "completed for device {}", deviceId); + deviceAuditStatus.put(deviceId, true); + // Execute all pending group requests + ConcurrentMap<GroupKey, StoredGroupEntry> pendingGroupRequests = + getPendingGroupKeyTable(deviceId); + for (Group group:pendingGroupRequests.values()) { + GroupDescription tmp = new DefaultGroupDescription( + group.deviceId(), + group.type(), + group.buckets(), + group.appCookie(), + group.givenGroupId(), + group.appId()); + storeGroupDescriptionInternal(tmp); + } + getPendingGroupKeyTable(deviceId).clear(); + } else { + if (deviceAuditStatus.get(deviceId)) { + log.debug("deviceInitialAuditCompleted: Clearing AUDIT " + + "status for device {}", deviceId); + deviceAuditStatus.put(deviceId, false); + } + } + } + } + + @Override + public boolean deviceInitialAuditStatus(DeviceId deviceId) { + synchronized (deviceAuditStatus) { + return (deviceAuditStatus.get(deviceId) != null) + ? deviceAuditStatus.get(deviceId) : false; + } + } + + @Override + public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) { + + StoredGroupEntry existing = (groupEntriesById.get( + deviceId) != null) ? + groupEntriesById.get(deviceId).get(operation.groupId()) : + null; + + if (existing == null) { + log.warn("No group entry with ID {} found ", operation.groupId()); + return; + } + + switch (operation.opType()) { + case ADD: + notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing)); + break; + case MODIFY: + notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing)); + break; + case DELETE: + notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing)); + break; + default: + log.warn("Unknown group operation type {}", operation.opType()); + } + + ConcurrentMap<GroupKey, StoredGroupEntry> keyTable = + getGroupKeyTable(existing.deviceId()); + ConcurrentMap<GroupId, StoredGroupEntry> idTable = + getGroupIdTable(existing.deviceId()); + idTable.remove(existing.id()); + keyTable.remove(existing.appCookie()); + } + + @Override + public void addOrUpdateExtraneousGroupEntry(Group group) { + ConcurrentMap<GroupId, Group> extraneousIdTable = + getExtraneousGroupIdTable(group.deviceId()); + extraneousIdTable.put(group.id(), group); + // Check the reference counter + if (group.referenceCount() == 0) { + notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group)); + } + } + + @Override + public void removeExtraneousGroupEntry(Group group) { + ConcurrentMap<GroupId, Group> extraneousIdTable = + getExtraneousGroupIdTable(group.deviceId()); + extraneousIdTable.remove(group.id()); + } + + @Override + public Iterable<Group> getExtraneousGroups(DeviceId deviceId) { + // flatten and make iterator unmodifiable + return FluentIterable.from( + getExtraneousGroupIdTable(deviceId).values()); + } + + @Override + public void pushGroupMetrics(DeviceId deviceId, + Collection<Group> groupEntries) { + boolean deviceInitialAuditStatus = + deviceInitialAuditStatus(deviceId); + Set<Group> southboundGroupEntries = + Sets.newHashSet(groupEntries); + Set<Group> storedGroupEntries = + Sets.newHashSet(getGroups(deviceId)); + Set<Group> extraneousStoredEntries = + Sets.newHashSet(getExtraneousGroups(deviceId)); + + log.trace("pushGroupMetrics: Displaying all ({}) " + + "southboundGroupEntries for device {}", + southboundGroupEntries.size(), + deviceId); + for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) { + Group group = it.next(); + log.trace("Group {} in device {}", group, deviceId); + } + + log.trace("Displaying all ({}) stored group entries for device {}", + storedGroupEntries.size(), + deviceId); + for (Iterator<Group> it1 = storedGroupEntries.iterator(); + it1.hasNext();) { + Group group = it1.next(); + log.trace("Stored Group {} for device {}", group, deviceId); + } + + for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) { + Group group = it2.next(); + if (storedGroupEntries.remove(group)) { + // we both have the group, let's update some info then. + log.trace("Group AUDIT: group {} exists " + + "in both planes for device {}", + group.id(), deviceId); + groupAdded(group); + it2.remove(); + } + } + for (Group group : southboundGroupEntries) { + if (getGroup(group.deviceId(), group.id()) != null) { + // There is a group existing with the same id + // It is possible that group update is + // in progress while we got a stale info from switch + if (!storedGroupEntries.remove(getGroup( + group.deviceId(), group.id()))) { + log.warn("Group AUDIT: Inconsistent state:" + + "Group exists in ID based table while " + + "not present in key based table"); + } + } else { + // there are groups in the switch that aren't in the store + log.trace("Group AUDIT: extraneous group {} exists " + + "in data plane for device {}", + group.id(), deviceId); + extraneousStoredEntries.remove(group); + extraneousGroup(group); + } + } + for (Group group : storedGroupEntries) { + // there are groups in the store that aren't in the switch + log.trace("Group AUDIT: group {} missing " + + "in data plane for device {}", + group.id(), deviceId); + groupMissing(group); + } + for (Group group : extraneousStoredEntries) { + // there are groups in the extraneous store that + // aren't in the switch + log.trace("Group AUDIT: clearing extransoeus group {} " + + "from store for device {}", + group.id(), deviceId); + removeExtraneousGroupEntry(group); + } + + if (!deviceInitialAuditStatus) { + log.debug("Group AUDIT: Setting device {} initial " + + "AUDIT completed", deviceId); + deviceInitialAuditCompleted(deviceId, true); + } + } + + private void groupMissing(Group group) { + switch (group.state()) { + case PENDING_DELETE: + log.debug("Group {} delete confirmation from device {}", + group, group.deviceId()); + removeGroupEntry(group); + break; + case ADDED: + case PENDING_ADD: + case PENDING_UPDATE: + log.debug("Group {} is in store but not on device {}", + group, group.deviceId()); + StoredGroupEntry existing = (groupEntriesById.get( + group.deviceId()) != null) ? + groupEntriesById.get(group.deviceId()).get(group.id()) : + null; + log.trace("groupMissing: group " + + "entry {} in device {} moving " + + "from {} to PENDING_ADD", + existing.id(), + existing.deviceId(), + existing.state()); + existing.setState(Group.GroupState.PENDING_ADD); + notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED, + group)); + break; + default: + log.debug("Group {} has not been installed.", group); + break; + } + } + + private void extraneousGroup(Group group) { + log.debug("Group {} is on device {} but not in store.", + group, group.deviceId()); + addOrUpdateExtraneousGroupEntry(group); + } + + private void groupAdded(Group group) { + log.trace("Group {} Added or Updated in device {}", + group, group.deviceId()); + addOrUpdateGroupEntry(group); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStoreTest.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStoreTest.java new file mode 100644 index 00000000..dd6c8a58 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStoreTest.java @@ -0,0 +1,482 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import static org.junit.Assert.assertEquals; +import static org.onosproject.net.DeviceId.deviceId; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.packet.MacAddress; +import org.onlab.packet.MplsLabel; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.DefaultApplicationId; +import org.onosproject.core.GroupId; +import org.onosproject.net.DeviceId; +import org.onosproject.net.PortNumber; +import org.onosproject.net.flow.DefaultTrafficTreatment; +import org.onosproject.net.flow.TrafficTreatment; +import org.onosproject.net.group.DefaultGroup; +import org.onosproject.net.group.DefaultGroupBucket; +import org.onosproject.net.group.DefaultGroupDescription; +import org.onosproject.net.group.DefaultGroupKey; +import org.onosproject.net.group.Group; +import org.onosproject.net.group.GroupBucket; +import org.onosproject.net.group.GroupBuckets; +import org.onosproject.net.group.GroupDescription; +import org.onosproject.net.group.GroupEvent; +import org.onosproject.net.group.GroupKey; +import org.onosproject.net.group.GroupOperation; +import org.onosproject.net.group.GroupStore.UpdateType; +import org.onosproject.net.group.GroupStoreDelegate; +import org.onosproject.net.group.StoredGroupBucketEntry; +import org.onosproject.net.group.StoredGroupEntry; + +import com.google.common.collect.Iterables; + +/** + * Test of the simple DeviceStore implementation. + */ +public class SimpleGroupStoreTest { + + private SimpleGroupStore simpleGroupStore; + private final ApplicationId appId = + new DefaultApplicationId(2, "org.groupstore.test"); + + public static final DeviceId D1 = deviceId("of:1"); + + @Before + public void setUp() throws Exception { + simpleGroupStore = new SimpleGroupStore(); + simpleGroupStore.activate(); + } + + @After + public void tearDown() throws Exception { + simpleGroupStore.deactivate(); + } + + private class InternalGroupStoreDelegate + implements GroupStoreDelegate { + private GroupId createdGroupId = null; + private GroupKey createdGroupKey; + private GroupBuckets createdBuckets; + private GroupEvent.Type expectedEvent; + + public InternalGroupStoreDelegate(GroupKey key, + GroupBuckets buckets, + GroupEvent.Type expectedEvent) { + this.createdBuckets = buckets; + this.createdGroupKey = key; + this.expectedEvent = expectedEvent; + } + @Override + public void notify(GroupEvent event) { + assertEquals(expectedEvent, event.type()); + assertEquals(Group.Type.SELECT, event.subject().type()); + assertEquals(D1, event.subject().deviceId()); + assertEquals(createdGroupKey, event.subject().appCookie()); + assertEquals(createdBuckets.buckets(), event.subject().buckets().buckets()); + if (expectedEvent == GroupEvent.Type.GROUP_ADD_REQUESTED) { + createdGroupId = event.subject().id(); + assertEquals(Group.GroupState.PENDING_ADD, + event.subject().state()); + } else if (expectedEvent == GroupEvent.Type.GROUP_ADDED) { + createdGroupId = event.subject().id(); + assertEquals(Group.GroupState.ADDED, + event.subject().state()); + } else if (expectedEvent == GroupEvent.Type.GROUP_UPDATED) { + createdGroupId = event.subject().id(); + assertEquals(true, + event.subject().buckets(). + buckets().containsAll(createdBuckets.buckets())); + assertEquals(true, + createdBuckets.buckets(). + containsAll(event.subject().buckets().buckets())); + for (GroupBucket bucket:event.subject().buckets().buckets()) { + Optional<GroupBucket> matched = createdBuckets.buckets() + .stream() + .filter((expected) -> expected.equals(bucket)) + .findFirst(); + assertEquals(matched.get().packets(), + bucket.packets()); + assertEquals(matched.get().bytes(), + bucket.bytes()); + } + assertEquals(Group.GroupState.ADDED, + event.subject().state()); + } else if (expectedEvent == GroupEvent.Type.GROUP_UPDATE_REQUESTED) { + assertEquals(Group.GroupState.PENDING_UPDATE, + event.subject().state()); + } else if (expectedEvent == GroupEvent.Type.GROUP_REMOVE_REQUESTED) { + assertEquals(Group.GroupState.PENDING_DELETE, + event.subject().state()); + } else if (expectedEvent == GroupEvent.Type.GROUP_REMOVED) { + createdGroupId = event.subject().id(); + assertEquals(Group.GroupState.PENDING_DELETE, + event.subject().state()); + } else if (expectedEvent == GroupEvent.Type.GROUP_ADD_FAILED) { + createdGroupId = event.subject().id(); + assertEquals(Group.GroupState.PENDING_ADD, + event.subject().state()); + } else if (expectedEvent == GroupEvent.Type.GROUP_UPDATE_FAILED) { + createdGroupId = event.subject().id(); + assertEquals(Group.GroupState.PENDING_UPDATE, + event.subject().state()); + } else if (expectedEvent == GroupEvent.Type.GROUP_REMOVE_FAILED) { + createdGroupId = event.subject().id(); + assertEquals(Group.GroupState.PENDING_DELETE, + event.subject().state()); + } + } + + public void verifyGroupId(GroupId id) { + assertEquals(createdGroupId, id); + } + } + + /** + * Tests group store operations. The following operations are tested: + * a)Tests device group audit completion status change + * b)Tests storeGroup operation + * c)Tests getGroupCount operation + * d)Tests getGroup operation + * e)Tests getGroups operation + * f)Tests addOrUpdateGroupEntry operation from southbound + * g)Tests updateGroupDescription for ADD operation from northbound + * h)Tests updateGroupDescription for REMOVE operation from northbound + * i)Tests deleteGroupDescription operation from northbound + * j)Tests removeGroupEntry operation from southbound + */ + @Test + public void testGroupStoreOperations() { + // Set the Device AUDIT completed in the store + simpleGroupStore.deviceInitialAuditCompleted(D1, true); + + // Testing storeGroup operation + GroupKey newKey = new DefaultGroupKey("group1".getBytes()); + testStoreAndGetGroup(newKey); + + // Testing addOrUpdateGroupEntry operation from southbound + GroupKey currKey = newKey; + testAddGroupEntryFromSB(currKey); + + // Testing updateGroupDescription for ADD operation from northbound + newKey = new DefaultGroupKey("group1AddBuckets".getBytes()); + testAddBuckets(currKey, newKey); + + // Testing updateGroupDescription for REMOVE operation from northbound + currKey = newKey; + newKey = new DefaultGroupKey("group1RemoveBuckets".getBytes()); + testRemoveBuckets(currKey, newKey); + + // Testing addOrUpdateGroupEntry operation from southbound + currKey = newKey; + testUpdateGroupEntryFromSB(currKey); + + // Testing deleteGroupDescription operation from northbound + testDeleteGroup(currKey); + + // Testing removeGroupEntry operation from southbound + testRemoveGroupFromSB(currKey); + } + + // Testing storeGroup operation + private void testStoreAndGetGroup(GroupKey key) { + PortNumber[] ports = {PortNumber.portNumber(31), + PortNumber.portNumber(32)}; + List<PortNumber> outPorts = new ArrayList<PortNumber>(); + outPorts.addAll(Arrays.asList(ports)); + + List<GroupBucket> buckets = new ArrayList<GroupBucket>(); + for (PortNumber portNumber: outPorts) { + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder(); + tBuilder.setOutput(portNumber) + .setEthDst(MacAddress.valueOf("00:00:00:00:00:02")) + .setEthSrc(MacAddress.valueOf("00:00:00:00:00:01")) + .pushMpls() + .setMpls(MplsLabel.mplsLabel(106)); + buckets.add(DefaultGroupBucket.createSelectGroupBucket( + tBuilder.build())); + } + GroupBuckets groupBuckets = new GroupBuckets(buckets); + GroupDescription groupDesc = new DefaultGroupDescription( + D1, + Group.Type.SELECT, + groupBuckets, + key, + null, + appId); + InternalGroupStoreDelegate checkStoreGroupDelegate = + new InternalGroupStoreDelegate(key, + groupBuckets, + GroupEvent.Type.GROUP_ADD_REQUESTED); + simpleGroupStore.setDelegate(checkStoreGroupDelegate); + // Testing storeGroup operation + simpleGroupStore.storeGroupDescription(groupDesc); + + // Testing getGroupCount operation + assertEquals(1, simpleGroupStore.getGroupCount(D1)); + + // Testing getGroup operation + Group createdGroup = simpleGroupStore.getGroup(D1, key); + checkStoreGroupDelegate.verifyGroupId(createdGroup.id()); + + // Testing getGroups operation + Iterable<Group> createdGroups = simpleGroupStore.getGroups(D1); + int groupCount = 0; + for (Group group:createdGroups) { + checkStoreGroupDelegate.verifyGroupId(group.id()); + groupCount++; + } + assertEquals(1, groupCount); + simpleGroupStore.unsetDelegate(checkStoreGroupDelegate); + } + + // Testing addOrUpdateGroupEntry operation from southbound + private void testAddGroupEntryFromSB(GroupKey currKey) { + Group existingGroup = simpleGroupStore.getGroup(D1, currKey); + + InternalGroupStoreDelegate addGroupEntryDelegate = + new InternalGroupStoreDelegate(currKey, + existingGroup.buckets(), + GroupEvent.Type.GROUP_ADDED); + simpleGroupStore.setDelegate(addGroupEntryDelegate); + simpleGroupStore.addOrUpdateGroupEntry(existingGroup); + simpleGroupStore.unsetDelegate(addGroupEntryDelegate); + } + + // Testing addOrUpdateGroupEntry operation from southbound + private void testUpdateGroupEntryFromSB(GroupKey currKey) { + Group existingGroup = simpleGroupStore.getGroup(D1, currKey); + int totalPkts = 0; + int totalBytes = 0; + List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(); + for (GroupBucket bucket:existingGroup.buckets().buckets()) { + StoredGroupBucketEntry newBucket = + (StoredGroupBucketEntry) + DefaultGroupBucket.createSelectGroupBucket(bucket.treatment()); + newBucket.setPackets(10); + newBucket.setBytes(10 * 256 * 8); + totalPkts += 10; + totalBytes += 10 * 256 * 8; + newBucketList.add(newBucket); + } + GroupBuckets updatedBuckets = new GroupBuckets(newBucketList); + Group updatedGroup = new DefaultGroup(existingGroup.id(), + existingGroup.deviceId(), + existingGroup.type(), + updatedBuckets); + ((StoredGroupEntry) updatedGroup).setPackets(totalPkts); + ((StoredGroupEntry) updatedGroup).setBytes(totalBytes); + + InternalGroupStoreDelegate updateGroupEntryDelegate = + new InternalGroupStoreDelegate(currKey, + updatedBuckets, + GroupEvent.Type.GROUP_UPDATED); + simpleGroupStore.setDelegate(updateGroupEntryDelegate); + simpleGroupStore.addOrUpdateGroupEntry(updatedGroup); + simpleGroupStore.unsetDelegate(updateGroupEntryDelegate); + } + + // Testing updateGroupDescription for ADD operation from northbound + private void testAddBuckets(GroupKey currKey, GroupKey addKey) { + Group existingGroup = simpleGroupStore.getGroup(D1, currKey); + List<GroupBucket> buckets = new ArrayList<GroupBucket>(); + buckets.addAll(existingGroup.buckets().buckets()); + + PortNumber[] newNeighborPorts = {PortNumber.portNumber(41), + PortNumber.portNumber(42)}; + List<PortNumber> newOutPorts = new ArrayList<PortNumber>(); + newOutPorts.addAll(Collections.singletonList(newNeighborPorts[0])); + + List<GroupBucket> toAddBuckets = new ArrayList<GroupBucket>(); + for (PortNumber portNumber: newOutPorts) { + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder(); + tBuilder.setOutput(portNumber) + .setEthDst(MacAddress.valueOf("00:00:00:00:00:03")) + .setEthSrc(MacAddress.valueOf("00:00:00:00:00:01")) + .pushMpls() + .setMpls(MplsLabel.mplsLabel(106)); + toAddBuckets.add(DefaultGroupBucket.createSelectGroupBucket( + tBuilder.build())); + } + GroupBuckets toAddGroupBuckets = new GroupBuckets(toAddBuckets); + buckets.addAll(toAddBuckets); + GroupBuckets updatedGroupBuckets = new GroupBuckets(buckets); + InternalGroupStoreDelegate updateGroupDescDelegate = + new InternalGroupStoreDelegate(addKey, + updatedGroupBuckets, + GroupEvent.Type.GROUP_UPDATE_REQUESTED); + simpleGroupStore.setDelegate(updateGroupDescDelegate); + simpleGroupStore.updateGroupDescription(D1, + currKey, + UpdateType.ADD, + toAddGroupBuckets, + addKey); + simpleGroupStore.unsetDelegate(updateGroupDescDelegate); + } + + // Testing updateGroupDescription for REMOVE operation from northbound + private void testRemoveBuckets(GroupKey currKey, GroupKey removeKey) { + Group existingGroup = simpleGroupStore.getGroup(D1, currKey); + List<GroupBucket> buckets = new ArrayList<GroupBucket>(); + buckets.addAll(existingGroup.buckets().buckets()); + + List<GroupBucket> toRemoveBuckets = new ArrayList<GroupBucket>(); + + // There should be 4 buckets in the current group + toRemoveBuckets.add(buckets.remove(0)); + toRemoveBuckets.add(buckets.remove(1)); + GroupBuckets toRemoveGroupBuckets = new GroupBuckets(toRemoveBuckets); + + GroupBuckets remainingGroupBuckets = new GroupBuckets(buckets); + InternalGroupStoreDelegate removeGroupDescDelegate = + new InternalGroupStoreDelegate(removeKey, + remainingGroupBuckets, + GroupEvent.Type.GROUP_UPDATE_REQUESTED); + simpleGroupStore.setDelegate(removeGroupDescDelegate); + simpleGroupStore.updateGroupDescription(D1, + currKey, + UpdateType.REMOVE, + toRemoveGroupBuckets, + removeKey); + simpleGroupStore.unsetDelegate(removeGroupDescDelegate); + } + + // Testing deleteGroupDescription operation from northbound + private void testDeleteGroup(GroupKey currKey) { + Group existingGroup = simpleGroupStore.getGroup(D1, currKey); + InternalGroupStoreDelegate deleteGroupDescDelegate = + new InternalGroupStoreDelegate(currKey, + existingGroup.buckets(), + GroupEvent.Type.GROUP_REMOVE_REQUESTED); + simpleGroupStore.setDelegate(deleteGroupDescDelegate); + simpleGroupStore.deleteGroupDescription(D1, currKey); + simpleGroupStore.unsetDelegate(deleteGroupDescDelegate); + } + + // Testing removeGroupEntry operation from southbound + private void testRemoveGroupFromSB(GroupKey currKey) { + Group existingGroup = simpleGroupStore.getGroup(D1, currKey); + InternalGroupStoreDelegate removeGroupEntryDelegate = + new InternalGroupStoreDelegate(currKey, + existingGroup.buckets(), + GroupEvent.Type.GROUP_REMOVED); + simpleGroupStore.setDelegate(removeGroupEntryDelegate); + simpleGroupStore.removeGroupEntry(existingGroup); + + // Testing getGroup operation + existingGroup = simpleGroupStore.getGroup(D1, currKey); + assertEquals(null, existingGroup); + assertEquals(0, Iterables.size(simpleGroupStore.getGroups(D1))); + assertEquals(0, simpleGroupStore.getGroupCount(D1)); + + simpleGroupStore.unsetDelegate(removeGroupEntryDelegate); + } + + @Test + public void testGroupOperationFailure() { + + simpleGroupStore.deviceInitialAuditCompleted(D1, true); + + ApplicationId appId = + new DefaultApplicationId(2, "org.groupstore.test"); + GroupKey key = new DefaultGroupKey("group1".getBytes()); + PortNumber[] ports = {PortNumber.portNumber(31), + PortNumber.portNumber(32)}; + List<PortNumber> outPorts = new ArrayList<PortNumber>(); + outPorts.add(ports[0]); + outPorts.add(ports[1]); + + List<GroupBucket> buckets = new ArrayList<GroupBucket>(); + for (PortNumber portNumber: outPorts) { + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder(); + tBuilder.setOutput(portNumber) + .setEthDst(MacAddress.valueOf("00:00:00:00:00:02")) + .setEthSrc(MacAddress.valueOf("00:00:00:00:00:01")) + .pushMpls() + .setMpls(MplsLabel.mplsLabel(106)); + buckets.add(DefaultGroupBucket.createSelectGroupBucket( + tBuilder.build())); + } + GroupBuckets groupBuckets = new GroupBuckets(buckets); + GroupDescription groupDesc = new DefaultGroupDescription( + D1, + Group.Type.SELECT, + groupBuckets, + key, + null, + appId); + InternalGroupStoreDelegate checkStoreGroupDelegate = + new InternalGroupStoreDelegate(key, + groupBuckets, + GroupEvent.Type.GROUP_ADD_REQUESTED); + simpleGroupStore.setDelegate(checkStoreGroupDelegate); + // Testing storeGroup operation + simpleGroupStore.storeGroupDescription(groupDesc); + simpleGroupStore.unsetDelegate(checkStoreGroupDelegate); + + // Testing Group add operation failure + Group createdGroup = simpleGroupStore.getGroup(D1, key); + checkStoreGroupDelegate.verifyGroupId(createdGroup.id()); + + GroupOperation groupAddOp = GroupOperation. + createAddGroupOperation(createdGroup.id(), + createdGroup.type(), + createdGroup.buckets()); + InternalGroupStoreDelegate checkGroupAddFailureDelegate = + new InternalGroupStoreDelegate(key, + groupBuckets, + GroupEvent.Type.GROUP_ADD_FAILED); + simpleGroupStore.setDelegate(checkGroupAddFailureDelegate); + simpleGroupStore.groupOperationFailed(D1, groupAddOp); + + + // Testing Group modify operation failure + simpleGroupStore.unsetDelegate(checkGroupAddFailureDelegate); + GroupOperation groupModOp = GroupOperation. + createModifyGroupOperation(createdGroup.id(), + createdGroup.type(), + createdGroup.buckets()); + InternalGroupStoreDelegate checkGroupModFailureDelegate = + new InternalGroupStoreDelegate(key, + groupBuckets, + GroupEvent.Type.GROUP_UPDATE_FAILED); + simpleGroupStore.setDelegate(checkGroupModFailureDelegate); + simpleGroupStore.groupOperationFailed(D1, groupModOp); + + // Testing Group modify operation failure + simpleGroupStore.unsetDelegate(checkGroupModFailureDelegate); + GroupOperation groupDelOp = GroupOperation. + createDeleteGroupOperation(createdGroup.id(), + createdGroup.type()); + InternalGroupStoreDelegate checkGroupDelFailureDelegate = + new InternalGroupStoreDelegate(key, + groupBuckets, + GroupEvent.Type.GROUP_REMOVE_FAILED); + simpleGroupStore.setDelegate(checkGroupDelFailureDelegate); + simpleGroupStore.groupOperationFailed(D1, groupDelOp); + } +} + diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleHostStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleHostStore.java new file mode 100644 index 00000000..f5604f68 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleHostStore.java @@ -0,0 +1,293 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import static org.onosproject.net.DefaultAnnotations.merge; +import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED; +import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED; +import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED; +import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.net.Annotations; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.DefaultHost; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Host; +import org.onosproject.net.HostId; +import org.onosproject.net.HostLocation; +import org.onosproject.net.host.HostDescription; +import org.onosproject.net.host.HostEvent; +import org.onosproject.net.host.HostStore; +import org.onosproject.net.host.HostStoreDelegate; +import org.onosproject.net.host.PortAddresses; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.store.AbstractStore; +import org.onlab.packet.IpAddress; +import org.onlab.packet.MacAddress; +import org.onlab.packet.VlanId; +import org.slf4j.Logger; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; + +// TODO: multi-provider, annotation not supported. +/** + * Manages inventory of end-station hosts using trivial in-memory + * implementation. + */ +@Component(immediate = true) +@Service +public class SimpleHostStore + extends AbstractStore<HostEvent, HostStoreDelegate> + implements HostStore { + + private final Logger log = getLogger(getClass()); + + // Host inventory + private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16); + + // Hosts tracked by their location + private final Multimap<ConnectPoint, Host> locations = HashMultimap.create(); + + private final SetMultimap<ConnectPoint, PortAddresses> portAddresses = + Multimaps.synchronizedSetMultimap( + HashMultimap.<ConnectPoint, PortAddresses>create()); + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + @Override + public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId, + HostDescription hostDescription) { + StoredHost host = hosts.get(hostId); + if (host == null) { + return createHost(providerId, hostId, hostDescription); + } + return updateHost(providerId, host, hostDescription); + } + + // creates a new host and sends HOST_ADDED + private HostEvent createHost(ProviderId providerId, HostId hostId, + HostDescription descr) { + StoredHost newhost = new StoredHost(providerId, hostId, + descr.hwAddress(), + descr.vlan(), + descr.location(), + ImmutableSet.copyOf(descr.ipAddress()), + descr.annotations()); + synchronized (this) { + hosts.put(hostId, newhost); + locations.put(descr.location(), newhost); + } + return new HostEvent(HOST_ADDED, newhost); + } + + // checks for type of update to host, sends appropriate event + private HostEvent updateHost(ProviderId providerId, StoredHost host, + HostDescription descr) { + HostEvent event; + if (!host.location().equals(descr.location())) { + host.setLocation(descr.location()); + return new HostEvent(HOST_MOVED, host); + } + + if (host.ipAddresses().containsAll(descr.ipAddress()) && + descr.annotations().keys().isEmpty()) { + return null; + } + + Set<IpAddress> addresses = new HashSet<>(host.ipAddresses()); + addresses.addAll(descr.ipAddress()); + Annotations annotations = merge((DefaultAnnotations) host.annotations(), + descr.annotations()); + StoredHost updated = new StoredHost(providerId, host.id(), + host.mac(), host.vlan(), + descr.location(), addresses, + annotations); + event = new HostEvent(HOST_UPDATED, updated); + synchronized (this) { + hosts.put(host.id(), updated); + locations.remove(host.location(), host); + locations.put(updated.location(), updated); + } + return event; + } + + @Override + public HostEvent removeHost(HostId hostId) { + synchronized (this) { + Host host = hosts.remove(hostId); + if (host != null) { + locations.remove((host.location()), host); + return new HostEvent(HOST_REMOVED, host); + } + return null; + } + } + + @Override + public int getHostCount() { + return hosts.size(); + } + + @Override + public Iterable<Host> getHosts() { + return ImmutableSet.<Host>copyOf(hosts.values()); + } + + @Override + public Host getHost(HostId hostId) { + return hosts.get(hostId); + } + + @Override + public Set<Host> getHosts(VlanId vlanId) { + Set<Host> vlanset = new HashSet<>(); + for (Host h : hosts.values()) { + if (h.vlan().equals(vlanId)) { + vlanset.add(h); + } + } + return vlanset; + } + + @Override + public Set<Host> getHosts(MacAddress mac) { + Set<Host> macset = new HashSet<>(); + for (Host h : hosts.values()) { + if (h.mac().equals(mac)) { + macset.add(h); + } + } + return macset; + } + + @Override + public Set<Host> getHosts(IpAddress ip) { + Set<Host> ipset = new HashSet<>(); + for (Host h : hosts.values()) { + if (h.ipAddresses().contains(ip)) { + ipset.add(h); + } + } + return ipset; + } + + @Override + public Set<Host> getConnectedHosts(ConnectPoint connectPoint) { + return ImmutableSet.copyOf(locations.get(connectPoint)); + } + + @Override + public Set<Host> getConnectedHosts(DeviceId deviceId) { + Set<Host> hostset = new HashSet<>(); + for (ConnectPoint p : locations.keySet()) { + if (p.deviceId().equals(deviceId)) { + hostset.addAll(locations.get(p)); + } + } + return hostset; + } + + @Override + public void updateAddressBindings(PortAddresses addresses) { + portAddresses.put(addresses.connectPoint(), addresses); + } + + @Override + public void removeAddressBindings(PortAddresses addresses) { + portAddresses.remove(addresses.connectPoint(), addresses); + } + + @Override + public void clearAddressBindings(ConnectPoint connectPoint) { + portAddresses.removeAll(connectPoint); + } + + @Override + public Set<PortAddresses> getAddressBindings() { + synchronized (portAddresses) { + return ImmutableSet.copyOf(portAddresses.values()); + } + } + + @Override + public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) { + synchronized (portAddresses) { + Set<PortAddresses> addresses = portAddresses.get(connectPoint); + + if (addresses == null) { + return Collections.emptySet(); + } else { + return ImmutableSet.copyOf(addresses); + } + } + } + + // Auxiliary extension to allow location to mutate. + private static final class StoredHost extends DefaultHost { + private HostLocation location; + + /** + * Creates an end-station host using the supplied information. + * + * @param providerId provider identity + * @param id host identifier + * @param mac host MAC address + * @param vlan host VLAN identifier + * @param location host location + * @param ips host IP addresses + * @param annotations optional key/value annotations + */ + public StoredHost(ProviderId providerId, HostId id, + MacAddress mac, VlanId vlan, HostLocation location, + Set<IpAddress> ips, Annotations... annotations) { + super(providerId, id, mac, vlan, location, ips, annotations); + this.location = location; + } + + void setLocation(HostLocation location) { + this.location = location; + } + + @Override + public HostLocation location() { + return location; + } + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleIdBlockStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleIdBlockStore.java new file mode 100644 index 00000000..3f7e563a --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleIdBlockStore.java @@ -0,0 +1,48 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.core.IdBlock; +import org.onosproject.core.IdBlockStore; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Simple implementation of id block store. + */ +@Component(immediate = true) +@Service +public class SimpleIdBlockStore implements IdBlockStore { + + private static final long DEFAULT_BLOCK_SIZE = 0x1000L; + + private final Map<String, AtomicLong> topicBlocks = new ConcurrentHashMap<>(); + + @Override + public synchronized IdBlock getIdBlock(String topic) { + AtomicLong blockGenerator = topicBlocks.get(topic); + if (blockGenerator == null) { + blockGenerator = new AtomicLong(0); + topicBlocks.put(topic, blockGenerator); + } + Long blockBase = blockGenerator.getAndAdd(DEFAULT_BLOCK_SIZE); + return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java new file mode 100644 index 00000000..9f959663 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java @@ -0,0 +1,212 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.net.intent.Intent; +import org.onosproject.net.intent.IntentData; +import org.onosproject.net.intent.IntentEvent; +import org.onosproject.net.intent.IntentState; +import org.onosproject.net.intent.IntentStore; +import org.onosproject.net.intent.IntentStoreDelegate; +import org.onosproject.net.intent.Key; +import org.onosproject.store.AbstractStore; +import org.slf4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.onosproject.net.intent.IntentState.PURGE_REQ; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Simple single-instance implementation of the intent store. + */ +@Component(immediate = true) +@Service +public class SimpleIntentStore + extends AbstractStore<IntentEvent, IntentStoreDelegate> + implements IntentStore { + + private final Logger log = getLogger(getClass()); + + private final Map<Key, IntentData> current = Maps.newConcurrentMap(); + private final Map<Key, IntentData> pending = Maps.newConcurrentMap(); + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + @Override + public long getIntentCount() { + return current.size(); + } + + @Override + public Iterable<Intent> getIntents() { + return current.values().stream() + .map(IntentData::intent) + .collect(Collectors.toList()); + } + + @Override + public Iterable<IntentData> getIntentData(boolean localOnly, long olderThan) { + if (localOnly || olderThan > 0) { + long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns + final SystemClockTimestamp time = new SystemClockTimestamp(older); + return current.values().stream() + .filter(data -> data.version().isOlderThan(time) && + (!localOnly || isMaster(data.key()))) + .collect(Collectors.toList()); + } + return Lists.newArrayList(current.values()); + } + + @Override + public IntentState getIntentState(Key intentKey) { + IntentData data = current.get(intentKey); + return (data != null) ? data.state() : null; + } + + @Override + public List<Intent> getInstallableIntents(Key intentKey) { + IntentData data = current.get(intentKey); + if (data != null) { + return data.installables(); + } + return null; + } + + @Override + public void write(IntentData newData) { + checkNotNull(newData); + + synchronized (this) { + // TODO this could be refactored/cleaned up + IntentData currentData = current.get(newData.key()); + IntentData pendingData = pending.get(newData.key()); + + if (IntentData.isUpdateAcceptable(currentData, newData)) { + if (pendingData != null) { + if (pendingData.state() == PURGE_REQ) { + current.remove(newData.key(), newData); + } else { + current.put(newData.key(), new IntentData(newData)); + } + + if (pendingData.version().compareTo(newData.version()) <= 0) { + // pendingData version is less than or equal to newData's + // Note: a new update for this key could be pending (it's version will be greater) + pending.remove(newData.key()); + } + } + notifyDelegateIfNotNull(IntentEvent.getEvent(newData)); + } + } + } + + private void notifyDelegateIfNotNull(IntentEvent event) { + if (event != null) { + notifyDelegate(event); + } + } + + @Override + public void batchWrite(Iterable<IntentData> updates) { + for (IntentData data : updates) { + write(data); + } + } + + @Override + public Intent getIntent(Key key) { + IntentData data = current.get(key); + return (data != null) ? data.intent() : null; + } + + @Override + public IntentData getIntentData(Key key) { + IntentData currentData = current.get(key); + if (currentData == null) { + return null; + } + return new IntentData(currentData); + } + + @Override + public void addPending(IntentData data) { + if (data.version() == null) { // recompiled intents will already have a version + data.setVersion(new SystemClockTimestamp()); + } + synchronized (this) { + IntentData existingData = pending.get(data.key()); + if (existingData == null || + // existing version is strictly less than data's version + // Note: if they are equal, we already have the update + // TODO maybe we should still make this <= to be safe? + existingData.version().compareTo(data.version()) < 0) { + pending.put(data.key(), data); + checkNotNull(delegate, "Store delegate is not set") + .process(new IntentData(data)); + notifyDelegateIfNotNull(IntentEvent.getEvent(data)); + } else { + log.debug("IntentData {} is older than existing: {}", + data, existingData); + } + //TODO consider also checking the current map at this point + } + } + + @Override + public boolean isMaster(Key intentKey) { + return true; + } + + @Override + public Iterable<Intent> getPending() { + return pending.values().stream() + .map(IntentData::intent) + .collect(Collectors.toList()); + } + + @Override + public Iterable<IntentData> getPendingData() { + return Lists.newArrayList(pending.values()); + } + + @Override + public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) { + long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns + final SystemClockTimestamp time = new SystemClockTimestamp(older); + return pending.values().stream() + .filter(data -> data.version().isOlderThan(time) && + (!localOnly || isMaster(data.key()))) + .collect(Collectors.toList()); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java new file mode 100644 index 00000000..194ffec1 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java @@ -0,0 +1,135 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.stream.Collectors; + +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.Leadership; +import org.onosproject.cluster.LeadershipEvent; +import org.onosproject.cluster.LeadershipEvent.Type; +import org.onosproject.cluster.LeadershipEventListener; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; + +/** + * A trivial implementation of the leadership service. + * <p> + * The service is not distributed, so it can assume there's a single leadership + * contender. This contender is always granted leadership whenever it asks. + */ +@Component(immediate = true) +@Service +public class SimpleLeadershipManager implements LeadershipService { + + private Set<LeadershipEventListener> listeners = new CopyOnWriteArraySet<>(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + private ClusterService clusterService; + + private Map<String, Boolean> elections = new ConcurrentHashMap<>(); + + @Override + public NodeId getLeader(String path) { + return elections.get(path) ? clusterService.getLocalNode().id() : null; + } + + @Override + public Leadership getLeadership(String path) { + checkArgument(path != null); + return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0, 0) : null; + } + + @Override + public Set<String> ownedTopics(NodeId nodeId) { + checkArgument(nodeId != null); + return elections.entrySet() + .stream() + .filter(Entry::getValue) + .map(Entry::getKey) + .collect(Collectors.toSet()); + } + + @Override + public CompletableFuture<Leadership> runForLeadership(String path) { + elections.put(path, true); + for (LeadershipEventListener listener : listeners) { + listener.event(new LeadershipEvent(Type.LEADER_ELECTED, + new Leadership(path, clusterService.getLocalNode().id(), 0, 0))); + } + return CompletableFuture.completedFuture(new Leadership(path, clusterService.getLocalNode().id(), 0, 0)); + } + + @Override + public CompletableFuture<Void> withdraw(String path) { + elections.remove(path); + for (LeadershipEventListener listener : listeners) { + listener.event(new LeadershipEvent(Type.LEADER_BOOTED, + new Leadership(path, clusterService.getLocalNode().id(), 0, 0))); + } + return CompletableFuture.completedFuture(null); + } + + @Override + public Map<String, Leadership> getLeaderBoard() { + //FIXME + throw new UnsupportedOperationException("I don't know what to do." + + " I wish you luck."); + } + + @Override + public void addListener(LeadershipEventListener listener) { + listeners.add(listener); + } + + @Override + public void removeListener(LeadershipEventListener listener) { + listeners.remove(listener); + } + + @Override + public Map<String, List<NodeId>> getCandidates() { + return null; + } + + @Override + public List<NodeId> getCandidates(String path) { + return null; + } + + @Override + public boolean stepdown(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean makeTopCandidate(String path, NodeId nodeId) { + throw new UnsupportedOperationException(); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkResourceStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkResourceStore.java new file mode 100644 index 00000000..58b446cf --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkResourceStore.java @@ -0,0 +1,286 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.Bandwidth; +import org.onlab.util.PositionalParameterStringFormatter; +import org.onosproject.net.AnnotationKeys; +import org.onosproject.net.Annotations; +import org.onosproject.net.Link; +import org.onosproject.net.intent.IntentId; +import org.onosproject.net.resource.link.BandwidthResource; +import org.onosproject.net.resource.link.BandwidthResourceAllocation; +import org.onosproject.net.resource.link.LambdaResource; +import org.onosproject.net.resource.link.LambdaResourceAllocation; +import org.onosproject.net.resource.link.LinkResourceAllocations; +import org.onosproject.net.resource.link.LinkResourceEvent; +import org.onosproject.net.resource.link.LinkResourceStore; +import org.onosproject.net.resource.ResourceAllocation; +import org.onosproject.net.resource.ResourceAllocationException; +import org.onosproject.net.resource.ResourceType; +import org.slf4j.Logger; + +import com.google.common.collect.ImmutableList; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages link resources using trivial in-memory structures implementation. + */ +@Component(immediate = true) +@Service +public class SimpleLinkResourceStore implements LinkResourceStore { + private static final BandwidthResource DEFAULT_BANDWIDTH = new BandwidthResource(Bandwidth.mbps(1_000)); + private final Logger log = getLogger(getClass()); + + private Map<IntentId, LinkResourceAllocations> linkResourceAllocationsMap; + private Map<Link, Set<LinkResourceAllocations>> allocatedResources; + private Map<Link, Set<ResourceAllocation>> freeResources; + + @Activate + public void activate() { + linkResourceAllocationsMap = new HashMap<>(); + allocatedResources = new HashMap<>(); + freeResources = new HashMap<>(); + + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + /** + * Returns free resources for a given link obtaining from topology + * information. + * + * @param link the target link + * @return free resources + */ + private synchronized Set<ResourceAllocation> readOriginalFreeResources(Link link) { + Annotations annotations = link.annotations(); + Set<ResourceAllocation> allocations = new HashSet<>(); + + try { + int waves = Integer.parseInt(annotations.value(AnnotationKeys.OPTICAL_WAVES)); + for (int i = 1; i <= waves; i++) { + allocations.add(new LambdaResourceAllocation(LambdaResource.valueOf(i))); + } + } catch (NumberFormatException e) { + log.debug("No optical.wave annotation on link %s", link); + } + + BandwidthResource bandwidth = DEFAULT_BANDWIDTH; + try { + bandwidth = new BandwidthResource( + Bandwidth.mbps((Double.parseDouble(annotations.value(AnnotationKeys.BANDWIDTH))))); + } catch (NumberFormatException e) { + log.debug("No bandwidth annotation on link %s", link); + } + allocations.add( + new BandwidthResourceAllocation(bandwidth)); + return allocations; + } + + /** + * Finds and returns {@link BandwidthResourceAllocation} object from a given + * set. + * + * @param freeRes a set of ResourceAllocation object. + * @return {@link BandwidthResourceAllocation} object if found, otherwise + * {@link BandwidthResourceAllocation} object with 0 bandwidth + * + */ + private synchronized BandwidthResourceAllocation getBandwidth( + Set<ResourceAllocation> freeRes) { + for (ResourceAllocation res : freeRes) { + if (res.type() == ResourceType.BANDWIDTH) { + return (BandwidthResourceAllocation) res; + } + } + return new BandwidthResourceAllocation(new BandwidthResource(Bandwidth.bps(0))); + } + + /** + * Subtracts given resources from free resources for given link. + * + * @param link the target link + * @param allocations the resources to be subtracted + */ + private synchronized void subtractFreeResources(Link link, + LinkResourceAllocations allocations) { + // TODO Use lock or version for updating freeResources. + checkNotNull(link); + Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link)); + Set<ResourceAllocation> subRes = allocations.getResourceAllocation(link); + for (ResourceAllocation res : subRes) { + switch (res.type()) { + case BANDWIDTH: + BandwidthResourceAllocation ba = getBandwidth(freeRes); + double requestedBandwidth = + ((BandwidthResourceAllocation) res).bandwidth().toDouble(); + double newBandwidth = ba.bandwidth().toDouble() - requestedBandwidth; + if (newBandwidth < 0.0) { + throw new ResourceAllocationException( + PositionalParameterStringFormatter.format( + "Unable to allocate bandwidth for link {} " + + "requested amount is {} current allocation is {}", + link, + requestedBandwidth, + ba)); + } + freeRes.remove(ba); + freeRes.add(new BandwidthResourceAllocation( + new BandwidthResource(Bandwidth.bps(newBandwidth)))); + break; + case LAMBDA: + final boolean lambdaAvailable = freeRes.remove(res); + if (!lambdaAvailable) { + int requestedLambda = + ((LambdaResourceAllocation) res).lambda().toInt(); + throw new ResourceAllocationException( + PositionalParameterStringFormatter.format( + "Unable to allocate lambda for link {} lambda is {}", + link, + requestedLambda)); + } + break; + default: + break; + } + } + freeResources.put(link, freeRes); + + } + + /** + * Adds given resources to free resources for given link. + * + * @param link the target link + * @param allocations the resources to be added + */ + private synchronized void addFreeResources(Link link, + LinkResourceAllocations allocations) { + // TODO Use lock or version for updating freeResources. + Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link)); + Set<ResourceAllocation> addRes = allocations.getResourceAllocation(link); + for (ResourceAllocation res : addRes) { + switch (res.type()) { + case BANDWIDTH: + BandwidthResourceAllocation ba = getBandwidth(freeRes); + double requestedBandwidth = + ((BandwidthResourceAllocation) res).bandwidth().toDouble(); + double newBandwidth = ba.bandwidth().toDouble() + requestedBandwidth; + freeRes.remove(ba); + freeRes.add(new BandwidthResourceAllocation( + new BandwidthResource(Bandwidth.bps(newBandwidth)))); + break; + case LAMBDA: + checkState(freeRes.add(res)); + break; + default: + break; + } + } + freeResources.put(link, freeRes); + } + + @Override + public synchronized Set<ResourceAllocation> getFreeResources(Link link) { + checkNotNull(link); + Set<ResourceAllocation> freeRes = freeResources.get(link); + if (freeRes == null) { + freeRes = readOriginalFreeResources(link); + } + + return freeRes; + } + + @Override + public synchronized void allocateResources(LinkResourceAllocations allocations) { + checkNotNull(allocations); + linkResourceAllocationsMap.put(allocations.intentId(), allocations); + for (Link link : allocations.links()) { + subtractFreeResources(link, allocations); + Set<LinkResourceAllocations> linkAllocs = allocatedResources.get(link); + if (linkAllocs == null) { + linkAllocs = new HashSet<>(); + } + linkAllocs.add(allocations); + allocatedResources.put(link, linkAllocs); + } + } + + @Override + public synchronized LinkResourceEvent releaseResources(LinkResourceAllocations allocations) { + checkNotNull(allocations); + linkResourceAllocationsMap.remove(allocations.intentId()); + for (Link link : allocations.links()) { + addFreeResources(link, allocations); + Set<LinkResourceAllocations> linkAllocs = allocatedResources.get(link); + if (linkAllocs == null) { + log.error("Missing resource allocation."); + } else { + linkAllocs.remove(allocations); + } + allocatedResources.put(link, linkAllocs); + } + + final List<LinkResourceAllocations> releasedResources = + ImmutableList.of(allocations); + + return new LinkResourceEvent( + LinkResourceEvent.Type.ADDITIONAL_RESOURCES_AVAILABLE, + releasedResources); + } + + @Override + public synchronized LinkResourceAllocations getAllocations(IntentId intentId) { + checkNotNull(intentId); + return linkResourceAllocationsMap.get(intentId); + } + + @Override + public synchronized Iterable<LinkResourceAllocations> getAllocations(Link link) { + checkNotNull(link); + Set<LinkResourceAllocations> result = allocatedResources.get(link); + if (result == null) { + result = Collections.emptySet(); + } + return Collections.unmodifiableSet(result); + } + + @Override + public synchronized Iterable<LinkResourceAllocations> getAllocations() { + return Collections.unmodifiableCollection(linkResourceAllocationsMap.values()); + } + + +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkResourceStoreTest.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkResourceStoreTest.java new file mode 100644 index 00000000..238e75d0 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkResourceStoreTest.java @@ -0,0 +1,307 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.util.Bandwidth; +import org.onosproject.net.AnnotationKeys; +import org.onosproject.net.Annotations; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.DefaultLink; +import org.onosproject.net.Link; +import org.onosproject.net.intent.IntentId; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.net.resource.link.BandwidthResource; +import org.onosproject.net.resource.link.BandwidthResourceAllocation; +import org.onosproject.net.resource.link.LambdaResource; +import org.onosproject.net.resource.link.LambdaResourceAllocation; +import org.onosproject.net.resource.link.LinkResourceAllocations; +import org.onosproject.net.resource.link.LinkResourceStore; +import org.onosproject.net.resource.ResourceAllocation; +import org.onosproject.net.resource.ResourceAllocationException; +import org.onosproject.net.resource.ResourceRequest; +import org.onosproject.net.resource.ResourceType; + +import com.google.common.collect.ImmutableSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.onosproject.net.DeviceId.deviceId; +import static org.onosproject.net.Link.Type.DIRECT; +import static org.onosproject.net.PortNumber.portNumber; + +/** + * Test of the simple LinkResourceStore implementation. + */ +public class SimpleLinkResourceStoreTest { + + private LinkResourceStore store; + private SimpleLinkResourceStore simpleStore; + private Link link1; + private Link link2; + private Link link3; + + /** + * Returns {@link Link} object. + * + * @param dev1 source device + * @param port1 source port + * @param dev2 destination device + * @param port2 destination port + * @return created {@link Link} object + */ + private static Link newLink(String dev1, int port1, String dev2, int port2) { + Annotations annotations = DefaultAnnotations.builder() + .set(AnnotationKeys.OPTICAL_WAVES, "80") + .set(AnnotationKeys.BANDWIDTH, "1000") + .build(); + return new DefaultLink( + new ProviderId("of", "foo"), + new ConnectPoint(deviceId(dev1), portNumber(port1)), + new ConnectPoint(deviceId(dev2), portNumber(port2)), + DIRECT, annotations); + } + + @Before + public void setUp() throws Exception { + simpleStore = new SimpleLinkResourceStore(); + simpleStore.activate(); + store = simpleStore; + + link1 = newLink("of:1", 1, "of:2", 2); + link2 = newLink("of:2", 1, "of:3", 2); + link3 = newLink("of:3", 1, "of:4", 2); + } + + @After + public void tearDown() throws Exception { + simpleStore.deactivate(); + } + + /** + * Tests constructor and activate method. + */ + @Test + public void testConstructorAndActivate() { + final Iterable<LinkResourceAllocations> allAllocations = store.getAllocations(); + assertNotNull(allAllocations); + assertFalse(allAllocations.iterator().hasNext()); + + final Iterable<LinkResourceAllocations> linkAllocations = + store.getAllocations(link1); + assertNotNull(linkAllocations); + assertFalse(linkAllocations.iterator().hasNext()); + + final Set<ResourceAllocation> res = store.getFreeResources(link2); + assertNotNull(res); + } + + /** + * Picks up and returns one of bandwidth allocations from a given set. + * + * @param resources the set of {@link ResourceAllocation}s + * @return {@link BandwidthResourceAllocation} object if found, null + * otherwise + */ + private BandwidthResourceAllocation getBandwidthObj(Set<ResourceAllocation> resources) { + for (ResourceAllocation res : resources) { + if (res.type() == ResourceType.BANDWIDTH) { + return ((BandwidthResourceAllocation) res); + } + } + return null; + } + + /** + * Returns all lambda allocations from a given set. + * + * @param resources the set of {@link ResourceAllocation}s + * @return a set of {@link LambdaResourceAllocation} objects + */ + private Set<LambdaResourceAllocation> getLambdaObjs(Set<ResourceAllocation> resources) { + Set<LambdaResourceAllocation> lambdaResources = new HashSet<>(); + for (ResourceAllocation res : resources) { + if (res.type() == ResourceType.LAMBDA) { + lambdaResources.add((LambdaResourceAllocation) res); + } + } + return lambdaResources; + } + + /** + * Tests initial free bandwidth for a link. + */ + @Test + public void testInitialBandwidth() { + final Set<ResourceAllocation> freeRes = store.getFreeResources(link1); + assertNotNull(freeRes); + + final BandwidthResourceAllocation alloc = getBandwidthObj(freeRes); + assertNotNull(alloc); + + assertEquals(new BandwidthResource(Bandwidth.mbps(1000.0)), alloc.bandwidth()); + } + + /** + * Tests initial free lambda for a link. + */ + @Test + public void testInitialLambdas() { + final Set<ResourceAllocation> freeRes = store.getFreeResources(link3); + assertNotNull(freeRes); + + final Set<LambdaResourceAllocation> res = getLambdaObjs(freeRes); + assertNotNull(res); + assertEquals(80, res.size()); + } + + public static class MockLinkResourceBandwidthAllocations implements LinkResourceAllocations { + final double allocationAmount; + + MockLinkResourceBandwidthAllocations(Double allocationAmount) { + this.allocationAmount = allocationAmount; + } + @Override + public Set<ResourceAllocation> getResourceAllocation(Link link) { + final ResourceAllocation allocation = + new BandwidthResourceAllocation(new BandwidthResource(Bandwidth.bps(allocationAmount))); + final Set<ResourceAllocation> allocations = new HashSet<>(); + allocations.add(allocation); + return allocations; + } + + @Override + public IntentId intentId() { + return null; + } + + @Override + public Collection<Link> links() { + return ImmutableSet.of(newLink("of:1", 1, "of:2", 2)); + } + + @Override + public Set<ResourceRequest> resources() { + return null; + } + + @Override + public ResourceType type() { + return null; + } + } + + public static class MockLinkResourceLambdaAllocations implements LinkResourceAllocations { + final int allocatedLambda; + + MockLinkResourceLambdaAllocations(int allocatedLambda) { + this.allocatedLambda = allocatedLambda; + } + @Override + public Set<ResourceAllocation> getResourceAllocation(Link link) { + final ResourceAllocation allocation = + new LambdaResourceAllocation(LambdaResource.valueOf(allocatedLambda)); + final Set<ResourceAllocation> allocations = new HashSet<>(); + allocations.add(allocation); + return allocations; + } + + @Override + public IntentId intentId() { + return null; + } + + @Override + public Collection<Link> links() { + return ImmutableSet.of(newLink("of:1", 1, "of:2", 2)); + } + + @Override + public Set<ResourceRequest> resources() { + return null; + } + + @Override + public ResourceType type() { + return null; + } + } + + /** + * Tests a successful bandwidth allocation. + */ + @Test + public void testSuccessfulBandwidthAllocation() { + final LinkResourceAllocations allocations = + new MockLinkResourceBandwidthAllocations(900.0); + store.allocateResources(allocations); + } + + /** + * Tests an unsuccessful bandwidth allocation. + */ + @Test + public void testUnsuccessfulBandwidthAllocation() { + final LinkResourceAllocations allocations = + new MockLinkResourceBandwidthAllocations(2000000000.0); + boolean gotException = false; + try { + store.allocateResources(allocations); + } catch (ResourceAllocationException rae) { + assertEquals(true, rae.getMessage().contains("Unable to allocate bandwidth for link")); + gotException = true; + } + assertEquals(true, gotException); + } + + /** + * Tests a successful lambda allocation. + */ + @Test + public void testSuccessfulLambdaAllocation() { + final LinkResourceAllocations allocations = + new MockLinkResourceLambdaAllocations(1); + store.allocateResources(allocations); + } + + /** + * Tests an unsuccessful lambda allocation. + */ + @Test + public void testUnsuccessfulLambdaAllocation() { + final LinkResourceAllocations allocations = + new MockLinkResourceLambdaAllocations(1); + store.allocateResources(allocations); + + boolean gotException = false; + + try { + store.allocateResources(allocations); + } catch (ResourceAllocationException rae) { + assertEquals(true, rae.getMessage().contains("Unable to allocate lambda for link")); + gotException = true; + } + assertEquals(true, gotException); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkStore.java new file mode 100644 index 00000000..d0be2b1f --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkStore.java @@ -0,0 +1,366 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.net.AnnotationKeys; +import org.onosproject.net.AnnotationsUtil; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.DefaultLink; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Link; +import org.onosproject.net.Link.Type; +import org.onosproject.net.LinkKey; +import org.onosproject.net.SparseAnnotations; +import org.onosproject.net.link.DefaultLinkDescription; +import org.onosproject.net.link.LinkDescription; +import org.onosproject.net.link.LinkEvent; +import org.onosproject.net.link.LinkStore; +import org.onosproject.net.link.LinkStoreDelegate; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.store.AbstractStore; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.google.common.base.Predicates.notNull; +import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.collect.Multimaps.synchronizedSetMultimap; +import static org.onosproject.net.DefaultAnnotations.merge; +import static org.onosproject.net.DefaultAnnotations.union; +import static org.onosproject.net.Link.State.ACTIVE; +import static org.onosproject.net.Link.State.INACTIVE; +import static org.onosproject.net.Link.Type.DIRECT; +import static org.onosproject.net.Link.Type.INDIRECT; +import static org.onosproject.net.LinkKey.linkKey; +import static org.onosproject.net.link.LinkEvent.Type.*; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages inventory of infrastructure links using trivial in-memory structures + * implementation. + */ +@Component(immediate = true) +@Service +public class SimpleLinkStore + extends AbstractStore<LinkEvent, LinkStoreDelegate> + implements LinkStore { + + private final Logger log = getLogger(getClass()); + + // Link inventory + private final ConcurrentMap<LinkKey, Map<ProviderId, LinkDescription>> + linkDescs = new ConcurrentHashMap<>(); + + // Link instance cache + private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>(); + + // Egress and ingress link sets + private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap(); + private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap(); + + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + linkDescs.clear(); + links.clear(); + srcLinks.clear(); + dstLinks.clear(); + log.info("Stopped"); + } + + @Override + public int getLinkCount() { + return links.size(); + } + + @Override + public Iterable<Link> getLinks() { + return Collections.unmodifiableCollection(links.values()); + } + + @Override + public Set<Link> getDeviceEgressLinks(DeviceId deviceId) { + // lock for iteration + synchronized (srcLinks) { + return FluentIterable.from(srcLinks.get(deviceId)) + .transform(lookupLink()) + .filter(notNull()) + .toSet(); + } + } + + @Override + public Set<Link> getDeviceIngressLinks(DeviceId deviceId) { + // lock for iteration + synchronized (dstLinks) { + return FluentIterable.from(dstLinks.get(deviceId)) + .transform(lookupLink()) + .filter(notNull()) + .toSet(); + } + } + + @Override + public Link getLink(ConnectPoint src, ConnectPoint dst) { + return links.get(linkKey(src, dst)); + } + + @Override + public Set<Link> getEgressLinks(ConnectPoint src) { + Set<Link> egress = new HashSet<>(); + synchronized (srcLinks) { + for (LinkKey linkKey : srcLinks.get(src.deviceId())) { + if (linkKey.src().equals(src)) { + egress.add(links.get(linkKey)); + } + } + } + return egress; + } + + @Override + public Set<Link> getIngressLinks(ConnectPoint dst) { + Set<Link> ingress = new HashSet<>(); + synchronized (dstLinks) { + for (LinkKey linkKey : dstLinks.get(dst.deviceId())) { + if (linkKey.dst().equals(dst)) { + ingress.add(links.get(linkKey)); + } + } + } + return ingress; + } + + @Override + public LinkEvent createOrUpdateLink(ProviderId providerId, + LinkDescription linkDescription) { + LinkKey key = linkKey(linkDescription.src(), linkDescription.dst()); + + Map<ProviderId, LinkDescription> descs = getOrCreateLinkDescriptions(key); + synchronized (descs) { + final Link oldLink = links.get(key); + // update description + createOrUpdateLinkDescription(descs, providerId, linkDescription); + final Link newLink = composeLink(descs); + if (oldLink == null) { + return createLink(key, newLink); + } + return updateLink(key, oldLink, newLink); + } + } + + @Override + public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) { + Link link = getLink(src, dst); + if (link == null) { + return null; + } + + if (link.isDurable()) { + return link.state() == INACTIVE ? null : + updateLink(linkKey(link.src(), link.dst()), link, + new DefaultLink(link.providerId(), + link.src(), link.dst(), + link.type(), INACTIVE, + link.isDurable(), + link.annotations())); + } + return removeLink(src, dst); + } + + // Guarded by linkDescs value (=locking each Link) + private LinkDescription createOrUpdateLinkDescription( + Map<ProviderId, LinkDescription> descs, + ProviderId providerId, + LinkDescription linkDescription) { + + // merge existing attributes and merge + LinkDescription oldDesc = descs.get(providerId); + LinkDescription newDesc = linkDescription; + if (oldDesc != null) { + // we only allow transition from INDIRECT -> DIRECT + final Type newType; + if (oldDesc.type() == DIRECT) { + newType = DIRECT; + } else { + newType = linkDescription.type(); + } + SparseAnnotations merged = union(oldDesc.annotations(), + linkDescription.annotations()); + newDesc = new DefaultLinkDescription(linkDescription.src(), + linkDescription.dst(), + newType, merged); + } + return descs.put(providerId, newDesc); + } + + // Creates and stores the link and returns the appropriate event. + // Guarded by linkDescs value (=locking each Link) + private LinkEvent createLink(LinkKey key, Link newLink) { + links.put(key, newLink); + srcLinks.put(newLink.src().deviceId(), key); + dstLinks.put(newLink.dst().deviceId(), key); + return new LinkEvent(LINK_ADDED, newLink); + } + + // Updates, if necessary the specified link and returns the appropriate event. + // Guarded by linkDescs value (=locking each Link) + private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) { + if (oldLink.state() != newLink.state() || + (oldLink.type() == INDIRECT && newLink.type() == DIRECT) || + !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) { + + links.put(key, newLink); + // strictly speaking following can be ommitted + srcLinks.put(oldLink.src().deviceId(), key); + dstLinks.put(oldLink.dst().deviceId(), key); + return new LinkEvent(LINK_UPDATED, newLink); + } + return null; + } + + @Override + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { + final LinkKey key = linkKey(src, dst); + Map<ProviderId, LinkDescription> descs = getOrCreateLinkDescriptions(key); + synchronized (descs) { + Link link = links.remove(key); + descs.clear(); + if (link != null) { + srcLinks.remove(link.src().deviceId(), key); + dstLinks.remove(link.dst().deviceId(), key); + return new LinkEvent(LINK_REMOVED, link); + } + return null; + } + } + + /** + * Creates concurrent readable, synchronized HashMultimap. + * + * @return SetMultimap + */ + private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() { + return synchronizedSetMultimap( + Multimaps.newSetMultimap(new ConcurrentHashMap<K, Collection<V>>(), + () -> Sets.newConcurrentHashSet())); + } + + /** + * @return primary ProviderID, or randomly chosen one if none exists + */ + // Guarded by linkDescs value (=locking each Link) + private ProviderId getBaseProviderId(Map<ProviderId, LinkDescription> providerDescs) { + + ProviderId fallBackPrimary = null; + for (Entry<ProviderId, LinkDescription> e : providerDescs.entrySet()) { + if (!e.getKey().isAncillary()) { + return e.getKey(); + } else if (fallBackPrimary == null) { + // pick randomly as a fallback in case there is no primary + fallBackPrimary = e.getKey(); + } + } + return fallBackPrimary; + } + + // Guarded by linkDescs value (=locking each Link) + private Link composeLink(Map<ProviderId, LinkDescription> descs) { + ProviderId primary = getBaseProviderId(descs); + LinkDescription base = descs.get(verifyNotNull(primary)); + + ConnectPoint src = base.src(); + ConnectPoint dst = base.dst(); + Type type = base.type(); + DefaultAnnotations annotations = DefaultAnnotations.builder().build(); + annotations = merge(annotations, base.annotations()); + + for (Entry<ProviderId, LinkDescription> e : descs.entrySet()) { + if (primary.equals(e.getKey())) { + continue; + } + + // TODO: should keep track of Description timestamp + // and only merge conflicting keys when timestamp is newer + // Currently assuming there will never be a key conflict between + // providers + + // annotation merging. not so efficient, should revisit later + annotations = merge(annotations, e.getValue().annotations()); + } + + boolean isDurable = Objects.equals(annotations.value(AnnotationKeys.DURABLE), "true"); + return new DefaultLink(primary, src, dst, type, ACTIVE, isDurable, annotations); + } + + private Map<ProviderId, LinkDescription> getOrCreateLinkDescriptions(LinkKey key) { + Map<ProviderId, LinkDescription> r; + r = linkDescs.get(key); + if (r != null) { + return r; + } + r = new HashMap<>(); + final Map<ProviderId, LinkDescription> concurrentlyAdded; + concurrentlyAdded = linkDescs.putIfAbsent(key, r); + if (concurrentlyAdded == null) { + return r; + } else { + return concurrentlyAdded; + } + } + + private final Function<LinkKey, Link> lookupLink = new LookupLink(); + + private Function<LinkKey, Link> lookupLink() { + return lookupLink; + } + + private final class LookupLink implements Function<LinkKey, Link> { + @Override + public Link apply(LinkKey input) { + if (input == null) { + return null; + } else { + return links.get(input); + } + } + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkStoreTest.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkStoreTest.java new file mode 100644 index 00000000..2d2b2759 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleLinkStoreTest.java @@ -0,0 +1,542 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.collect.Iterables; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.onosproject.net.AnnotationKeys; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Link; +import org.onosproject.net.Link.Type; +import org.onosproject.net.LinkKey; +import org.onosproject.net.PortNumber; +import org.onosproject.net.SparseAnnotations; +import org.onosproject.net.link.DefaultLinkDescription; +import org.onosproject.net.link.LinkEvent; +import org.onosproject.net.link.LinkStore; +import org.onosproject.net.link.LinkStoreDelegate; +import org.onosproject.net.provider.ProviderId; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; +import static org.onosproject.net.DeviceId.deviceId; +import static org.onosproject.net.Link.Type.*; +import static org.onosproject.net.link.LinkEvent.Type.*; +import static org.onosproject.net.NetTestTools.assertAnnotationsEquals; + +/** + * Test of the simple LinkStore implementation. + */ +public class SimpleLinkStoreTest { + + private static final ProviderId PID = new ProviderId("of", "foo"); + private static final ProviderId PIDA = new ProviderId("of", "bar", true); + private static final DeviceId DID1 = deviceId("of:foo"); + private static final DeviceId DID2 = deviceId("of:bar"); + + private static final PortNumber P1 = PortNumber.portNumber(1); + private static final PortNumber P2 = PortNumber.portNumber(2); + private static final PortNumber P3 = PortNumber.portNumber(3); + + private static final SparseAnnotations A1 = DefaultAnnotations.builder() + .set("A1", "a1") + .set("B1", "b1") + .build(); + private static final SparseAnnotations A1_2 = DefaultAnnotations.builder() + .remove("A1") + .set("B3", "b3") + .build(); + private static final SparseAnnotations A2 = DefaultAnnotations.builder() + .set("A2", "a2") + .set("B2", "b2") + .build(); + private static final SparseAnnotations A2_2 = DefaultAnnotations.builder() + .remove("A2") + .set("B4", "b4") + .build(); + + private static final SparseAnnotations DA1 = DefaultAnnotations.builder() + .set("A1", "a1") + .set("B1", "b1") + .set(AnnotationKeys.DURABLE, "true") + .build(); + private static final SparseAnnotations DA2 = DefaultAnnotations.builder() + .set("A2", "a2") + .set("B2", "b2") + .set(AnnotationKeys.DURABLE, "true") + .build(); + private static final SparseAnnotations NDA1 = DefaultAnnotations.builder() + .set("A1", "a1") + .set("B1", "b1") + .remove(AnnotationKeys.DURABLE) + .build(); + + + + private SimpleLinkStore simpleLinkStore; + private LinkStore linkStore; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Before + public void setUp() throws Exception { + simpleLinkStore = new SimpleLinkStore(); + simpleLinkStore.activate(); + linkStore = simpleLinkStore; + } + + @After + public void tearDown() throws Exception { + simpleLinkStore.deactivate(); + } + + private void putLink(DeviceId srcId, PortNumber srcNum, + DeviceId dstId, PortNumber dstNum, + Type type, boolean isDurable, + SparseAnnotations... annotations) { + ConnectPoint src = new ConnectPoint(srcId, srcNum); + ConnectPoint dst = new ConnectPoint(dstId, dstNum); + linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, + annotations)); + } + + private void putLink(LinkKey key, Type type, SparseAnnotations... annotations) { + putLink(key.src().deviceId(), key.src().port(), + key.dst().deviceId(), key.dst().port(), + type, false, annotations); + } + + private static void assertLink(DeviceId srcId, PortNumber srcNum, + DeviceId dstId, PortNumber dstNum, Type type, + Link link) { + assertEquals(srcId, link.src().deviceId()); + assertEquals(srcNum, link.src().port()); + assertEquals(dstId, link.dst().deviceId()); + assertEquals(dstNum, link.dst().port()); + assertEquals(type, link.type()); + } + + private static void assertLink(LinkKey key, Type type, Link link) { + assertLink(key.src().deviceId(), key.src().port(), + key.dst().deviceId(), key.dst().port(), + type, link); + } + + @Test + public final void testGetLinkCount() { + assertEquals("initialy empty", 0, linkStore.getLinkCount()); + + putLink(DID1, P1, DID2, P2, DIRECT, false); + putLink(DID2, P2, DID1, P1, DIRECT, false); + putLink(DID1, P1, DID2, P2, DIRECT, false); + + assertEquals("expecting 2 unique link", 2, linkStore.getLinkCount()); + } + + @Test + public final void testGetLinks() { + assertEquals("initialy empty", 0, + Iterables.size(linkStore.getLinks())); + + LinkKey linkId1 = LinkKey.linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2)); + LinkKey linkId2 = LinkKey.linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId1, DIRECT); + + assertEquals("expecting 2 unique link", 2, + Iterables.size(linkStore.getLinks())); + + Map<LinkKey, Link> links = new HashMap<>(); + for (Link link : linkStore.getLinks()) { + links.put(LinkKey.linkKey(link), link); + } + + assertLink(linkId1, DIRECT, links.get(linkId1)); + assertLink(linkId2, DIRECT, links.get(linkId2)); + } + + @Test + public final void testGetDeviceEgressLinks() { + LinkKey linkId1 = LinkKey.linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2)); + LinkKey linkId2 = LinkKey.linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1)); + LinkKey linkId3 = LinkKey.linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId3, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + Set<Link> links1 = linkStore.getDeviceEgressLinks(DID1); + assertEquals(2, links1.size()); + // check + + Set<Link> links2 = linkStore.getDeviceEgressLinks(DID2); + assertEquals(1, links2.size()); + assertLink(linkId2, DIRECT, links2.iterator().next()); + } + + @Test + public final void testGetDeviceIngressLinks() { + LinkKey linkId1 = LinkKey.linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2)); + LinkKey linkId2 = LinkKey.linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1)); + LinkKey linkId3 = LinkKey.linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId3, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + Set<Link> links1 = linkStore.getDeviceIngressLinks(DID2); + assertEquals(2, links1.size()); + // check + + Set<Link> links2 = linkStore.getDeviceIngressLinks(DID1); + assertEquals(1, links2.size()); + assertLink(linkId2, DIRECT, links2.iterator().next()); + } + + @Test + public final void testGetLink() { + ConnectPoint src = new ConnectPoint(DID1, P1); + ConnectPoint dst = new ConnectPoint(DID2, P2); + LinkKey linkId1 = LinkKey.linkKey(src, dst); + + putLink(linkId1, DIRECT); + + Link link = linkStore.getLink(src, dst); + assertLink(linkId1, DIRECT, link); + + assertNull("There shouldn't be reverese link", + linkStore.getLink(dst, src)); + } + + @Test + public final void testGetEgressLinks() { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2); + LinkKey linkId2 = LinkKey.linkKey(d2P2, d1P1); + LinkKey linkId3 = LinkKey.linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId3, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + Set<Link> links1 = linkStore.getEgressLinks(d1P1); + assertEquals(1, links1.size()); + assertLink(linkId1, DIRECT, links1.iterator().next()); + + Set<Link> links2 = linkStore.getEgressLinks(d2P2); + assertEquals(1, links2.size()); + assertLink(linkId2, DIRECT, links2.iterator().next()); + } + + @Test + public final void testGetIngressLinks() { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2); + LinkKey linkId2 = LinkKey.linkKey(d2P2, d1P1); + LinkKey linkId3 = LinkKey.linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId3, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + Set<Link> links1 = linkStore.getIngressLinks(d2P2); + assertEquals(1, links1.size()); + assertLink(linkId1, DIRECT, links1.iterator().next()); + + Set<Link> links2 = linkStore.getIngressLinks(d1P1); + assertEquals(1, links2.size()); + assertLink(linkId2, DIRECT, links2.iterator().next()); + } + + @Test + public final void testCreateOrUpdateLink() { + ConnectPoint src = new ConnectPoint(DID1, P1); + ConnectPoint dst = new ConnectPoint(DID2, P2); + + // add link + LinkEvent event = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, INDIRECT)); + + assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject()); + assertEquals(LINK_ADDED, event.type()); + + // update link type + LinkEvent event2 = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, DIRECT)); + + assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject()); + assertEquals(LINK_UPDATED, event2.type()); + + // no change + LinkEvent event3 = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, DIRECT)); + + assertNull("No change event expected", event3); + } + + @Test + public final void testCreateOrUpdateLinkAncillary() { + ConnectPoint src = new ConnectPoint(DID1, P1); + ConnectPoint dst = new ConnectPoint(DID2, P2); + + // add Ancillary link + LinkEvent event = linkStore.createOrUpdateLink(PIDA, + new DefaultLinkDescription(src, dst, INDIRECT, A1)); + + assertNotNull("Ancillary only link is ignored", event); + + // add Primary link + LinkEvent event2 = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, INDIRECT, A2)); + + assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject()); + assertAnnotationsEquals(event2.subject().annotations(), A2, A1); + assertEquals(LINK_UPDATED, event2.type()); + + // update link type + LinkEvent event3 = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, DIRECT, A2)); + assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject()); + assertAnnotationsEquals(event3.subject().annotations(), A2, A1); + assertEquals(LINK_UPDATED, event3.type()); + + + // no change + LinkEvent event4 = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, DIRECT)); + assertNull("No change event expected", event4); + + // update link annotation (Primary) + LinkEvent event5 = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, DIRECT, A2_2)); + assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject()); + assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1); + assertEquals(LINK_UPDATED, event5.type()); + + // update link annotation (Ancillary) + LinkEvent event6 = linkStore.createOrUpdateLink(PIDA, + new DefaultLinkDescription(src, dst, DIRECT, A1_2)); + assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject()); + assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2); + assertEquals(LINK_UPDATED, event6.type()); + + // update link type (Ancillary) : ignored + LinkEvent event7 = linkStore.createOrUpdateLink(PIDA, + new DefaultLinkDescription(src, dst, EDGE)); + assertNull("Ancillary change other than annotation is ignored", event7); + } + + + @Test + public final void testRemoveOrDownLink() { + removeOrDownLink(false); + } + + @Test + public final void testRemoveOrDownLinkDurable() { + removeOrDownLink(true); + } + + private void removeOrDownLink(boolean isDurable) { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2); + LinkKey linkId2 = LinkKey.linkKey(d2P2, d1P1); + + putLink(linkId1, DIRECT, isDurable ? DA1 : A1); + putLink(linkId2, DIRECT, isDurable ? DA2 : A2); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + LinkEvent event = linkStore.removeOrDownLink(d1P1, d2P2); + assertEquals(isDurable ? LINK_UPDATED : LINK_REMOVED, event.type()); + assertAnnotationsEquals(event.subject().annotations(), isDurable ? DA1 : A1); + LinkEvent event2 = linkStore.removeOrDownLink(d1P1, d2P2); + assertNull(event2); + + assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1)); + assertAnnotationsEquals(linkStore.getLink(d2P2, d1P1).annotations(), + isDurable ? DA2 : A2); + + // annotations, etc. should not survive remove + if (!isDurable) { + putLink(linkId1, DIRECT); + assertLink(linkId1, DIRECT, linkStore.getLink(d1P1, d2P2)); + assertAnnotationsEquals(linkStore.getLink(d1P1, d2P2).annotations()); + } + } + + @Test + public final void testRemoveLink() { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2); + LinkKey linkId2 = LinkKey.linkKey(d2P2, d1P1); + + putLink(linkId1, DIRECT, A1); + putLink(linkId2, DIRECT, A2); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + LinkEvent event = linkStore.removeLink(d1P1, d2P2); + assertEquals(LINK_REMOVED, event.type()); + assertAnnotationsEquals(event.subject().annotations(), A1); + LinkEvent event2 = linkStore.removeLink(d1P1, d2P2); + assertNull(event2); + + assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1)); + assertAnnotationsEquals(linkStore.getLink(d2P2, d1P1).annotations(), A2); + + // annotations, etc. should not survive remove + putLink(linkId1, DIRECT); + assertLink(linkId1, DIRECT, linkStore.getLink(d1P1, d2P2)); + assertAnnotationsEquals(linkStore.getLink(d1P1, d2P2).annotations()); + } + + @Test + public final void testAncillaryVisible() { + ConnectPoint src = new ConnectPoint(DID1, P1); + ConnectPoint dst = new ConnectPoint(DID2, P2); + + // add Ancillary link + linkStore.createOrUpdateLink(PIDA, + new DefaultLinkDescription(src, dst, INDIRECT, A1)); + + // Ancillary only link should not be visible + assertEquals(1, linkStore.getLinkCount()); + assertNotNull(linkStore.getLink(src, dst)); + } + + @Test + public void testDurableToNonDurable() { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2); + + putLink(linkId1, DIRECT, DA1); + assertTrue("should be be durable", linkStore.getLink(d1P1, d2P2).isDurable()); + putLink(linkId1, DIRECT, NDA1); + assertFalse("should not be durable", linkStore.getLink(d1P1, d2P2).isDurable()); + } + + @Test + public void testNonDurableToDurable() { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2); + + putLink(linkId1, DIRECT, A1); + assertFalse("should not be durable", linkStore.getLink(d1P1, d2P2).isDurable()); + putLink(linkId1, DIRECT, DA1); + assertTrue("should be durable", linkStore.getLink(d1P1, d2P2).isDurable()); + } + + // If Delegates should be called only on remote events, + // then Simple* should never call them, thus not test required. + @Ignore("Ignore until Delegate spec. is clear.") + @Test + public final void testEvents() throws InterruptedException { + + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + final LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2); + + final CountDownLatch addLatch = new CountDownLatch(1); + LinkStoreDelegate checkAdd = new LinkStoreDelegate() { + @Override + public void notify(LinkEvent event) { + assertEquals(LINK_ADDED, event.type()); + assertLink(linkId1, INDIRECT, event.subject()); + addLatch.countDown(); + } + }; + final CountDownLatch updateLatch = new CountDownLatch(1); + LinkStoreDelegate checkUpdate = new LinkStoreDelegate() { + @Override + public void notify(LinkEvent event) { + assertEquals(LINK_UPDATED, event.type()); + assertLink(linkId1, DIRECT, event.subject()); + updateLatch.countDown(); + } + }; + final CountDownLatch removeLatch = new CountDownLatch(1); + LinkStoreDelegate checkRemove = new LinkStoreDelegate() { + @Override + public void notify(LinkEvent event) { + assertEquals(LINK_REMOVED, event.type()); + assertLink(linkId1, DIRECT, event.subject()); + removeLatch.countDown(); + } + }; + + linkStore.setDelegate(checkAdd); + putLink(linkId1, INDIRECT); + assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS)); + + linkStore.unsetDelegate(checkAdd); + linkStore.setDelegate(checkUpdate); + putLink(linkId1, DIRECT); + assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS)); + + linkStore.unsetDelegate(checkUpdate); + linkStore.setDelegate(checkRemove); + linkStore.removeOrDownLink(d1P1, d2P2); + assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS)); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java new file mode 100644 index 00000000..ef92ded2 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java @@ -0,0 +1,388 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED; +import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.joda.time.DateTime; +import org.onlab.packet.IpAddress; +import org.onosproject.cluster.ClusterEventListener; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.ControllerNode.State; +import org.onosproject.cluster.DefaultControllerNode; +import org.onosproject.cluster.NodeId; +import org.onosproject.cluster.RoleInfo; +import org.onosproject.mastership.MastershipEvent; +import org.onosproject.mastership.MastershipStore; +import org.onosproject.mastership.MastershipStoreDelegate; +import org.onosproject.mastership.MastershipTerm; +import org.onosproject.net.DeviceId; +import org.onosproject.net.MastershipRole; +import org.onosproject.store.AbstractStore; +import org.slf4j.Logger; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +/** + * Manages inventory of controller mastership over devices using + * trivial, non-distributed in-memory structures implementation. + */ +@Component(immediate = true) +@Service +public class SimpleMastershipStore + extends AbstractStore<MastershipEvent, MastershipStoreDelegate> + implements MastershipStore { + + private final Logger log = getLogger(getClass()); + + private static final int NOTHING = 0; + private static final int INIT = 1; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + //devices mapped to their masters, to emulate multiple nodes + protected final Map<DeviceId, NodeId> masterMap = new HashMap<>(); + //emulate backups with pile of nodes + protected final Map<DeviceId, List<NodeId>> backups = new HashMap<>(); + //terms + protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>(); + + @Activate + public void activate() { + if (clusterService == null) { + // just for ease of unit test + final ControllerNode instance = + new DefaultControllerNode(new NodeId("local"), + IpAddress.valueOf("127.0.0.1")); + + clusterService = new ClusterService() { + + private final DateTime creationTime = DateTime.now(); + + @Override + public ControllerNode getLocalNode() { + return instance; + } + + @Override + public Set<ControllerNode> getNodes() { + return ImmutableSet.of(instance); + } + + @Override + public ControllerNode getNode(NodeId nodeId) { + if (instance.id().equals(nodeId)) { + return instance; + } + return null; + } + + @Override + public State getState(NodeId nodeId) { + if (instance.id().equals(nodeId)) { + return State.ACTIVE; + } else { + return State.INACTIVE; + } + } + + @Override + public DateTime getLastUpdated(NodeId nodeId) { + return creationTime; + } + + @Override + public void addListener(ClusterEventListener listener) { + } + + @Override + public void removeListener(ClusterEventListener listener) { + } + }; + } + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + @Override + public synchronized CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) { + + MastershipRole role = getRole(nodeId, deviceId); + switch (role) { + case MASTER: + // no-op + return CompletableFuture.completedFuture(null); + case STANDBY: + case NONE: + NodeId prevMaster = masterMap.put(deviceId, nodeId); + incrementTerm(deviceId); + removeFromBackups(deviceId, nodeId); + addToBackup(deviceId, prevMaster); + break; + default: + log.warn("unknown Mastership Role {}", role); + return null; + } + + return CompletableFuture.completedFuture( + new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); + } + + @Override + public NodeId getMaster(DeviceId deviceId) { + return masterMap.get(deviceId); + } + + // synchronized for atomic read + @Override + public synchronized RoleInfo getNodes(DeviceId deviceId) { + return new RoleInfo(masterMap.get(deviceId), + backups.getOrDefault(deviceId, ImmutableList.of())); + } + + @Override + public Set<DeviceId> getDevices(NodeId nodeId) { + Set<DeviceId> ids = new HashSet<>(); + for (Map.Entry<DeviceId, NodeId> d : masterMap.entrySet()) { + if (Objects.equals(d.getValue(), nodeId)) { + ids.add(d.getKey()); + } + } + return ids; + } + + @Override + public synchronized CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) { + //query+possible reelection + NodeId node = clusterService.getLocalNode().id(); + MastershipRole role = getRole(node, deviceId); + + switch (role) { + case MASTER: + return CompletableFuture.completedFuture(MastershipRole.MASTER); + case STANDBY: + if (getMaster(deviceId) == null) { + // no master => become master + masterMap.put(deviceId, node); + incrementTerm(deviceId); + // remove from backup list + removeFromBackups(deviceId, node); + notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, + getNodes(deviceId))); + return CompletableFuture.completedFuture(MastershipRole.MASTER); + } + return CompletableFuture.completedFuture(MastershipRole.STANDBY); + case NONE: + if (getMaster(deviceId) == null) { + // no master => become master + masterMap.put(deviceId, node); + incrementTerm(deviceId); + notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, + getNodes(deviceId))); + return CompletableFuture.completedFuture(MastershipRole.MASTER); + } + // add to backup list + if (addToBackup(deviceId, node)) { + notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, + getNodes(deviceId))); + } + return CompletableFuture.completedFuture(MastershipRole.STANDBY); + default: + log.warn("unknown Mastership Role {}", role); + } + return CompletableFuture.completedFuture(role); + } + + // add to backup if not there already, silently ignores null node + private synchronized boolean addToBackup(DeviceId deviceId, NodeId nodeId) { + boolean modified = false; + List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>()); + if (nodeId != null && !stbys.contains(nodeId)) { + stbys.add(nodeId); + modified = true; + } + backups.put(deviceId, stbys); + return modified; + } + + private synchronized boolean removeFromBackups(DeviceId deviceId, NodeId node) { + List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>()); + boolean modified = stbys.remove(node); + backups.put(deviceId, stbys); + return modified; + } + + private synchronized void incrementTerm(DeviceId deviceId) { + AtomicInteger term = termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)); + term.incrementAndGet(); + termMap.put(deviceId, term); + } + + @Override + public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { + //just query + NodeId current = masterMap.get(deviceId); + MastershipRole role; + + if (current != null && current.equals(nodeId)) { + return MastershipRole.MASTER; + } + + if (backups.getOrDefault(deviceId, Collections.emptyList()).contains(nodeId)) { + role = MastershipRole.STANDBY; + } else { + role = MastershipRole.NONE; + } + return role; + } + + // synchronized for atomic read + @Override + public synchronized MastershipTerm getTermFor(DeviceId deviceId) { + if ((termMap.get(deviceId) == null)) { + return MastershipTerm.of(masterMap.get(deviceId), NOTHING); + } + return MastershipTerm.of( + masterMap.get(deviceId), termMap.get(deviceId).get()); + } + + @Override + public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) { + MastershipRole role = getRole(nodeId, deviceId); + switch (role) { + case MASTER: + NodeId backup = reelect(deviceId, nodeId); + if (backup == null) { + // no master alternative + masterMap.remove(deviceId); + // TODO: Should there be new event type for no MASTER? + return CompletableFuture.completedFuture( + new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); + } else { + NodeId prevMaster = masterMap.put(deviceId, backup); + incrementTerm(deviceId); + addToBackup(deviceId, prevMaster); + return CompletableFuture.completedFuture( + new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); + } + + case STANDBY: + case NONE: + boolean modified = addToBackup(deviceId, nodeId); + if (modified) { + return CompletableFuture.completedFuture( + new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId))); + } + break; + + default: + log.warn("unknown Mastership Role {}", role); + } + return null; + } + + //dumbly selects next-available node that's not the current one + //emulate leader election + private synchronized NodeId reelect(DeviceId did, NodeId nodeId) { + List<NodeId> stbys = backups.getOrDefault(did, Collections.emptyList()); + NodeId backup = null; + for (NodeId n : stbys) { + if (!n.equals(nodeId)) { + backup = n; + break; + } + } + stbys.remove(backup); + return backup; + } + + @Override + public synchronized CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) { + MastershipRole role = getRole(nodeId, deviceId); + switch (role) { + case MASTER: + NodeId backup = reelect(deviceId, nodeId); + masterMap.put(deviceId, backup); + incrementTerm(deviceId); + return CompletableFuture.completedFuture( + new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); + + case STANDBY: + if (removeFromBackups(deviceId, nodeId)) { + return CompletableFuture.completedFuture( + new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId))); + } + break; + + case NONE: + break; + + default: + log.warn("unknown Mastership Role {}", role); + } + return CompletableFuture.completedFuture(null); + } + + @Override + public synchronized void relinquishAllRole(NodeId nodeId) { + List<CompletableFuture<MastershipEvent>> eventFutures = new ArrayList<>(); + Set<DeviceId> toRelinquish = new HashSet<>(); + + masterMap.entrySet().stream() + .filter(entry -> nodeId.equals(entry.getValue())) + .forEach(entry -> toRelinquish.add(entry.getKey())); + + backups.entrySet().stream() + .filter(entry -> entry.getValue().contains(nodeId)) + .forEach(entry -> toRelinquish.add(entry.getKey())); + + toRelinquish.forEach(deviceId -> { + eventFutures.add(relinquishRole(nodeId, deviceId)); + }); + + eventFutures.forEach(future -> { + future.whenComplete((event, error) -> { + notifyDelegate(event); + }); + }); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStoreTest.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStoreTest.java new file mode 100644 index 00000000..672fc503 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStoreTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onosproject.cluster.NodeId; +import org.onosproject.mastership.MastershipEvent; +import org.onosproject.mastership.MastershipTerm; +import org.onosproject.net.DeviceId; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.onosproject.mastership.MastershipEvent.Type.*; +import static org.onosproject.net.MastershipRole.*; + +/** + * Test for the simple MastershipStore implementation. + */ +public class SimpleMastershipStoreTest { + + private static final DeviceId DID1 = DeviceId.deviceId("of:01"); + private static final DeviceId DID2 = DeviceId.deviceId("of:02"); + private static final DeviceId DID3 = DeviceId.deviceId("of:03"); + private static final DeviceId DID4 = DeviceId.deviceId("of:04"); + + private static final NodeId N1 = new NodeId("local"); + private static final NodeId N2 = new NodeId("other"); + + private SimpleMastershipStore sms; + + @Before + public void setUp() throws Exception { + sms = new SimpleMastershipStore(); + sms.activate(); + } + + @After + public void tearDown() throws Exception { + sms.deactivate(); + } + + @Test + public void getRole() { + //special case, no backup or master + put(DID1, N1, false, false); + assertEquals("wrong role", NONE, sms.getRole(N1, DID1)); + + //backup exists but we aren't mapped + put(DID2, N1, false, true); + assertEquals("wrong role", STANDBY, sms.getRole(N1, DID2)); + + //N2 is master + put(DID3, N2, true, true); + assertEquals("wrong role", MASTER, sms.getRole(N2, DID3)); + + //N2 is master but N1 is only in backups set + put(DID4, N1, false, true); + put(DID4, N2, true, false); + assertEquals("wrong role", STANDBY, sms.getRole(N1, DID4)); + } + + @Test + public void getMaster() { + put(DID3, N2, true, true); + assertEquals("wrong role", MASTER, sms.getRole(N2, DID3)); + assertEquals("wrong device", N2, sms.getMaster(DID3)); + } + + @Test + public void setMaster() { + put(DID1, N1, false, false); + assertEquals("wrong event", MASTER_CHANGED, Futures.getUnchecked(sms.setMaster(N1, DID1)).type()); + assertEquals("wrong role", MASTER, sms.getRole(N1, DID1)); + //set node that's already master - should be ignored + assertNull("wrong event", Futures.getUnchecked(sms.setMaster(N1, DID1))); + + //set STANDBY to MASTER + put(DID2, N1, false, true); + assertEquals("wrong role", STANDBY, sms.getRole(N1, DID2)); + assertEquals("wrong event", MASTER_CHANGED, Futures.getUnchecked(sms.setMaster(N1, DID2)).type()); + assertEquals("wrong role", MASTER, sms.getRole(N1, DID2)); + } + + @Test + public void getDevices() { + Set<DeviceId> d = Sets.newHashSet(DID1, DID2); + + put(DID1, N2, true, true); + put(DID2, N2, true, true); + put(DID3, N1, true, true); + assertTrue("wrong devices", d.equals(sms.getDevices(N2))); + } + + @Test + public void getTermFor() { + put(DID1, N1, true, true); + assertEquals("wrong term", MastershipTerm.of(N1, 0), sms.getTermFor(DID1)); + + //switch to N2 and back - 2 term switches + sms.setMaster(N2, DID1); + sms.setMaster(N1, DID1); + assertEquals("wrong term", MastershipTerm.of(N1, 2), sms.getTermFor(DID1)); + } + + @Test + public void requestRole() { + //NONE - become MASTER + put(DID1, N1, false, false); + assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID1))); + + //was STANDBY - become MASTER + put(DID2, N1, false, true); + assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID2))); + + //other MASTER - stay STANDBY + put(DID3, N2, true, false); + assertEquals("wrong role", STANDBY, Futures.getUnchecked(sms.requestRole(DID3))); + + //local (N1) is MASTER - stay MASTER + put(DID4, N1, true, true); + assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID4))); + } + + @Test + public void unsetMaster() { + //NONE - record backup but take no other action + put(DID1, N1, false, false); + sms.setStandby(N1, DID1); + assertTrue("not backed up", sms.backups.get(DID1).contains(N1)); + int prev = sms.termMap.get(DID1).get(); + sms.setStandby(N1, DID1); + assertEquals("term should not change", prev, sms.termMap.get(DID1).get()); + + //no backup, MASTER + put(DID1, N1, true, false); + assertNull("expect no MASTER event", Futures.getUnchecked(sms.setStandby(N1, DID1)).roleInfo().master()); + assertNull("wrong node", sms.masterMap.get(DID1)); + + //backup, switch + sms.masterMap.clear(); + put(DID1, N1, true, true); + put(DID1, N2, false, true); + put(DID2, N2, true, true); + MastershipEvent event = Futures.getUnchecked(sms.setStandby(N1, DID1)); + assertEquals("wrong event", MASTER_CHANGED, event.type()); + assertEquals("wrong master", N2, event.roleInfo().master()); + } + + //helper to populate master/backup structures + private void put(DeviceId dev, NodeId node, boolean master, boolean backup) { + if (master) { + sms.masterMap.put(dev, node); + } else if (backup) { + List<NodeId> stbys = sms.backups.getOrDefault(dev, new ArrayList<>()); + stbys.add(node); + sms.backups.put(dev, stbys); + } + sms.termMap.put(dev, new AtomicInteger()); + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java new file mode 100644 index 00000000..4345abaf --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java @@ -0,0 +1,64 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.collect.Sets; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.net.packet.OutboundPacket; +import org.onosproject.net.packet.PacketEvent; +import org.onosproject.net.packet.PacketEvent.Type; +import org.onosproject.net.packet.PacketRequest; +import org.onosproject.net.packet.PacketStore; +import org.onosproject.net.packet.PacketStoreDelegate; +import org.onosproject.store.AbstractStore; + + +import java.util.Collections; +import java.util.Set; + +/** + * Simple single instance implementation of the packet store. + */ +@Component(immediate = true) +@Service +public class SimplePacketStore + extends AbstractStore<PacketEvent, PacketStoreDelegate> + implements PacketStore { + + private Set<PacketRequest> requests = Sets.newConcurrentHashSet(); + + @Override + public void emit(OutboundPacket packet) { + notifyDelegate(new PacketEvent(Type.EMIT, packet)); + } + + @Override + public boolean requestPackets(PacketRequest request) { + return requests.add(request); + } + + @Override + public boolean cancelPackets(PacketRequest request) { + return requests.remove(request); + } + + @Override + public Set<PacketRequest> existingRequests() { + return Collections.unmodifiableSet(requests); + } + +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleStatisticStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleStatisticStore.java new file mode 100644 index 00000000..370686f9 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleStatisticStore.java @@ -0,0 +1,211 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.collect.Sets; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.PortNumber; +import org.onosproject.net.flow.FlowEntry; +import org.onosproject.net.flow.FlowRule; +import org.onosproject.net.flow.instructions.Instruction; +import org.onosproject.net.flow.instructions.Instructions; +import org.onosproject.net.statistic.StatisticStore; +import org.slf4j.Logger; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.slf4j.LoggerFactory.getLogger; + + +/** + * Maintains statistics using RPC calls to collect stats from remote instances + * on demand. + */ +@Component(immediate = true) +@Service +public class SimpleStatisticStore implements StatisticStore { + + private final Logger log = getLogger(getClass()); + + private Map<ConnectPoint, InternalStatisticRepresentation> + representations = new ConcurrentHashMap<>(); + + private Map<ConnectPoint, Set<FlowEntry>> previous = new ConcurrentHashMap<>(); + private Map<ConnectPoint, Set<FlowEntry>> current = new ConcurrentHashMap<>(); + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + @Override + public void prepareForStatistics(FlowRule rule) { + ConnectPoint cp = buildConnectPoint(rule); + if (cp == null) { + return; + } + InternalStatisticRepresentation rep; + synchronized (representations) { + rep = getOrCreateRepresentation(cp); + } + rep.prepare(); + } + + @Override + public synchronized void removeFromStatistics(FlowRule rule) { + ConnectPoint cp = buildConnectPoint(rule); + if (cp == null) { + return; + } + InternalStatisticRepresentation rep = representations.get(cp); + if (rep != null && rep.remove(rule)) { + updatePublishedStats(cp, Collections.emptySet()); + } + Set<FlowEntry> values = current.get(cp); + if (values != null) { + values.remove(rule); + } + values = previous.get(cp); + if (values != null) { + values.remove(rule); + } + + } + + @Override + public void addOrUpdateStatistic(FlowEntry rule) { + ConnectPoint cp = buildConnectPoint(rule); + if (cp == null) { + return; + } + InternalStatisticRepresentation rep = representations.get(cp); + if (rep != null && rep.submit(rule)) { + updatePublishedStats(cp, rep.get()); + } + } + + private synchronized void updatePublishedStats(ConnectPoint cp, + Set<FlowEntry> flowEntries) { + Set<FlowEntry> curr = current.get(cp); + if (curr == null) { + curr = new HashSet<>(); + } + previous.put(cp, curr); + current.put(cp, flowEntries); + + } + + @Override + public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) { + return getCurrentStatisticInternal(connectPoint); + } + + private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) { + return current.get(connectPoint); + } + + @Override + public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) { + return getPreviousStatisticInternal(connectPoint); + } + + private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) { + return previous.get(connectPoint); + } + + private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) { + + if (representations.containsKey(cp)) { + return representations.get(cp); + } else { + InternalStatisticRepresentation rep = new InternalStatisticRepresentation(); + representations.put(cp, rep); + return rep; + } + + } + + private ConnectPoint buildConnectPoint(FlowRule rule) { + PortNumber port = getOutput(rule); + + if (port == null) { + return null; + } + ConnectPoint cp = new ConnectPoint(rule.deviceId(), port); + return cp; + } + + private PortNumber getOutput(FlowRule rule) { + for (Instruction i : rule.treatment().immediate()) { + if (i.type() == Instruction.Type.OUTPUT) { + Instructions.OutputInstruction out = (Instructions.OutputInstruction) i; + return out.port(); + } + if (i.type() == Instruction.Type.DROP) { + return PortNumber.P0; + } + } + return null; + } + + private class InternalStatisticRepresentation { + + private final AtomicInteger counter = new AtomicInteger(0); + private final Set<FlowEntry> rules = new HashSet<>(); + + public void prepare() { + counter.incrementAndGet(); + } + + public synchronized boolean remove(FlowRule rule) { + rules.remove(rule); + return counter.decrementAndGet() == 0; + } + + public synchronized boolean submit(FlowEntry rule) { + if (rules.contains(rule)) { + rules.remove(rule); + } + rules.add(rule); + if (counter.get() == 0) { + return true; + } else { + return counter.decrementAndGet() == 0; + } + } + + public synchronized Set<FlowEntry> get() { + counter.set(rules.size()); + return Sets.newHashSet(rules); + } + + } + +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleTopologyStore.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleTopologyStore.java new file mode 100644 index 00000000..6a89c019 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SimpleTopologyStore.java @@ -0,0 +1,157 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.common.DefaultTopology; +import org.onosproject.event.Event; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Link; +import org.onosproject.net.Path; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.net.topology.ClusterId; +import org.onosproject.net.topology.GraphDescription; +import org.onosproject.net.topology.LinkWeight; +import org.onosproject.net.topology.Topology; +import org.onosproject.net.topology.TopologyCluster; +import org.onosproject.net.topology.TopologyEvent; +import org.onosproject.net.topology.TopologyGraph; +import org.onosproject.net.topology.TopologyStore; +import org.onosproject.net.topology.TopologyStoreDelegate; +import org.onosproject.store.AbstractStore; +import org.slf4j.Logger; + +import java.util.List; +import java.util.Set; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages inventory of topology snapshots using trivial in-memory + * structures implementation. + */ +@Component(immediate = true) +@Service +public class SimpleTopologyStore + extends AbstractStore<TopologyEvent, TopologyStoreDelegate> + implements TopologyStore { + + private final Logger log = getLogger(getClass()); + + private volatile DefaultTopology current; + + @Activate + public void activate() { + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + @Override + public Topology currentTopology() { + return current; + } + + @Override + public boolean isLatest(Topology topology) { + // Topology is current only if it is the same as our current topology + return topology == current; + } + + @Override + public TopologyGraph getGraph(Topology topology) { + return defaultTopology(topology).getGraph(); + } + + @Override + public Set<TopologyCluster> getClusters(Topology topology) { + return defaultTopology(topology).getClusters(); + } + + @Override + public TopologyCluster getCluster(Topology topology, ClusterId clusterId) { + return defaultTopology(topology).getCluster(clusterId); + } + + @Override + public Set<DeviceId> getClusterDevices(Topology topology, TopologyCluster cluster) { + return defaultTopology(topology).getClusterDevices(cluster); + } + + @Override + public Set<Link> getClusterLinks(Topology topology, TopologyCluster cluster) { + return defaultTopology(topology).getClusterLinks(cluster); + } + + @Override + public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst) { + return defaultTopology(topology).getPaths(src, dst); + } + + @Override + public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst, + LinkWeight weight) { + return defaultTopology(topology).getPaths(src, dst, weight); + } + + @Override + public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) { + return defaultTopology(topology).isInfrastructure(connectPoint); + } + + @Override + public boolean isBroadcastPoint(Topology topology, ConnectPoint connectPoint) { + return defaultTopology(topology).isBroadcastPoint(connectPoint); + } + + @Override + public TopologyEvent updateTopology(ProviderId providerId, + GraphDescription graphDescription, + List<Event> reasons) { + // First off, make sure that what we're given is indeed newer than + // what we already have. + if (current != null && graphDescription.timestamp() < current.time()) { + return null; + } + + // Have the default topology construct self from the description data. + DefaultTopology newTopology = + new DefaultTopology(providerId, graphDescription); + + // Promote the new topology to current and return a ready-to-send event. + synchronized (this) { + current = newTopology; + return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, + current, reasons); + } + } + + // Validates the specified topology and returns it as a default + private DefaultTopology defaultTopology(Topology topology) { + if (topology instanceof DefaultTopology) { + return (DefaultTopology) topology; + } + throw new IllegalArgumentException("Topology class " + topology.getClass() + + " not supported"); + } + +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SystemClockTimestamp.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SystemClockTimestamp.java new file mode 100644 index 00000000..2ee41945 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/SystemClockTimestamp.java @@ -0,0 +1,83 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.trivial; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ComparisonChain; +import org.onosproject.store.Timestamp; + +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * A Timestamp that derives its value from the system clock time (in ns) + * on the controller where it is generated. + */ +public class SystemClockTimestamp implements Timestamp { + + private final long nanoTimestamp; + + public SystemClockTimestamp() { + nanoTimestamp = System.nanoTime(); + } + + public SystemClockTimestamp(long timestamp) { + nanoTimestamp = timestamp; + } + + @Override + public int compareTo(Timestamp o) { + checkArgument(o instanceof SystemClockTimestamp, + "Must be SystemClockTimestamp", o); + SystemClockTimestamp that = (SystemClockTimestamp) o; + + return ComparisonChain.start() + .compare(this.nanoTimestamp, that.nanoTimestamp) + .result(); + } + @Override + public int hashCode() { + return Objects.hash(nanoTimestamp); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof SystemClockTimestamp)) { + return false; + } + SystemClockTimestamp that = (SystemClockTimestamp) obj; + return Objects.equals(this.nanoTimestamp, that.nanoTimestamp); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("nanoTimestamp", nanoTimestamp) + .toString(); + } + + public long nanoTimestamp() { + return nanoTimestamp; + } + + public long systemTimestamp() { + return nanoTimestamp / 1_000_000; // convert ns to ms + } +} diff --git a/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/package-info.java b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/package-info.java new file mode 100644 index 00000000..2b0c36b8 --- /dev/null +++ b/framework/src/onos/core/common/src/test/java/org/onosproject/store/trivial/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementations of in-memory stores suitable for unit testing and + * experimentation; not for production use. + */ +package org.onosproject.store.trivial; |