summaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java334
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java68
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java243
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/package-info.java20
4 files changed, 0 insertions, 665 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
deleted file mode 100644
index 1e5db99c..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.intent.impl;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.net.intent.Intent;
-import org.onosproject.net.intent.IntentData;
-import org.onosproject.net.intent.IntentEvent;
-import org.onosproject.net.intent.IntentState;
-import org.onosproject.net.intent.IntentStore;
-import org.onosproject.net.intent.IntentStoreDelegate;
-import org.onosproject.net.intent.Key;
-import org.onosproject.net.intent.PartitionService;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.service.MultiValuedTimestamp;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.intent.IntentState.PURGE_REQ;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of Intents in a distributed data store that uses optimistic
- * replication and gossip based techniques.
- */
-//FIXME we should listen for leadership changes. if the local instance has just
-// ... become a leader, scan the pending map and process those
-@Component(immediate = true, enabled = true)
-@Service
-public class GossipIntentStore
- extends AbstractStore<IntentEvent, IntentStoreDelegate>
- implements IntentStore {
-
- private final Logger log = getLogger(getClass());
-
- // Map of intent key => current intent state
- private EventuallyConsistentMap<Key, IntentData> currentMap;
-
- // Map of intent key => pending intent operation
- private EventuallyConsistentMap<Key, IntentData> pendingMap;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected PartitionService partitionService;
-
- private final AtomicLong sequenceNumber = new AtomicLong(0);
-
- @Activate
- public void activate() {
- KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(IntentData.class)
- .register(MultiValuedTimestamp.class)
- .register(WallClockTimestamp.class);
-
- currentMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
- .withName("intent-current")
- .withSerializer(intentSerializer)
- .withTimestampProvider((key, intentData) ->
- new MultiValuedTimestamp<>(intentData.version(),
- sequenceNumber.getAndIncrement()))
- .withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
- .build();
-
- pendingMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
- .withName("intent-pending")
- .withSerializer(intentSerializer)
- .withTimestampProvider((key, intentData) -> new MultiValuedTimestamp<>(intentData.version(),
- System.nanoTime()))
- .withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
- .build();
-
- currentMap.addListener(new InternalCurrentListener());
- pendingMap.addListener(new InternalPendingListener());
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- currentMap.destroy();
- pendingMap.destroy();
-
- log.info("Stopped");
- }
-
- @Override
- public long getIntentCount() {
- return currentMap.size();
- }
-
- @Override
- public Iterable<Intent> getIntents() {
- return currentMap.values().stream()
- .map(IntentData::intent)
- .collect(Collectors.toList());
- }
-
- @Override
- public Iterable<IntentData> getIntentData(boolean localOnly, long olderThan) {
- if (localOnly || olderThan > 0) {
- long now = System.currentTimeMillis();
- final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
- return currentMap.values().stream()
- .filter(data -> data.version().isOlderThan(time) &&
- (!localOnly || isMaster(data.key())))
- .collect(Collectors.toList());
- }
- return currentMap.values();
- }
-
- @Override
- public IntentState getIntentState(Key intentKey) {
- IntentData data = currentMap.get(intentKey);
- if (data != null) {
- return data.state();
- }
- return null;
- }
-
- @Override
- public List<Intent> getInstallableIntents(Key intentKey) {
- IntentData data = currentMap.get(intentKey);
- if (data != null) {
- return data.installables();
- }
- return null;
- }
-
-
-
- @Override
- public void write(IntentData newData) {
- checkNotNull(newData);
-
- IntentData currentData = currentMap.get(newData.key());
- if (IntentData.isUpdateAcceptable(currentData, newData)) {
- // Only the master is modifying the current state. Therefore assume
- // this always succeeds
- if (newData.state() == PURGE_REQ) {
- currentMap.remove(newData.key(), currentData);
- } else {
- currentMap.put(newData.key(), new IntentData(newData));
- }
-
- // if current.put succeeded
- pendingMap.remove(newData.key(), newData);
- }
- }
-
- private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
- NodeId master = partitionService.getLeader(key);
- NodeId origin = (data != null) ? data.origin() : null;
- if (data != null && (master == null || origin == null)) {
- log.debug("Intent {} missing master and/or origin; master = {}, origin = {}",
- key, master, origin);
- }
-
- NodeId me = clusterService.getLocalNode().id();
- boolean isMaster = Objects.equals(master, me);
- boolean isOrigin = Objects.equals(origin, me);
- if (isMaster && isOrigin) {
- return getRandomNode();
- } else if (isMaster) {
- return origin != null ? ImmutableList.of(origin) : getRandomNode();
- } else if (isOrigin) {
- return master != null ? ImmutableList.of(master) : getRandomNode();
- } else {
- log.warn("No master or origin for intent {}", key);
- return master != null ? ImmutableList.of(master) : getRandomNode();
- }
- }
-
- private List<NodeId> getRandomNode() {
- NodeId me = clusterService.getLocalNode().id();
- List<NodeId> nodes = clusterService.getNodes().stream()
- .map(ControllerNode::id)
- .filter(node -> !Objects.equals(node, me))
- .collect(Collectors.toList());
- if (nodes.size() == 0) {
- return null;
- }
- return ImmutableList.of(nodes.get(RandomUtils.nextInt(nodes.size())));
- }
-
- @Override
- public void batchWrite(Iterable<IntentData> updates) {
- updates.forEach(this::write);
- }
-
- @Override
- public Intent getIntent(Key key) {
- IntentData data = currentMap.get(key);
- if (data != null) {
- return data.intent();
- }
- return null;
- }
-
- @Override
- public IntentData getIntentData(Key key) {
- IntentData current = currentMap.get(key);
- if (current == null) {
- return null;
- }
- return new IntentData(current);
- }
-
- @Override
- public void addPending(IntentData data) {
- checkNotNull(data);
-
- if (data.version() == null) {
- data.setVersion(new WallClockTimestamp());
- }
- data.setOrigin(clusterService.getLocalNode().id());
- pendingMap.put(data.key(), new IntentData(data));
- }
-
- @Override
- public boolean isMaster(Key intentKey) {
- return partitionService.isMine(intentKey);
- }
-
- @Override
- public Iterable<Intent> getPending() {
- return pendingMap.values().stream()
- .map(IntentData::intent)
- .collect(Collectors.toList());
- }
-
- @Override
- public Iterable<IntentData> getPendingData() {
- return pendingMap.values();
- }
-
- @Override
- public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
- long now = System.currentTimeMillis();
- final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
- return pendingMap.values().stream()
- .filter(data -> data.version().isOlderThan(time) &&
- (!localOnly || isMaster(data.key())))
- .collect(Collectors.toList());
- }
-
- private void notifyDelegateIfNotNull(IntentEvent event) {
- if (event != null) {
- notifyDelegate(event);
- }
- }
-
- private final class InternalCurrentListener implements
- EventuallyConsistentMapListener<Key, IntentData> {
- @Override
- public void event(EventuallyConsistentMapEvent<Key, IntentData> event) {
- IntentData intentData = event.value();
-
- if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
- // The current intents map has been updated. If we are master for
- // this intent's partition, notify the Manager that it should
- // emit notifications about updated tracked resources.
- if (delegate != null && isMaster(event.value().intent().key())) {
- delegate.onUpdate(new IntentData(intentData)); // copy for safety, likely unnecessary
- }
- notifyDelegateIfNotNull(IntentEvent.getEvent(intentData));
- }
- }
- }
-
- private final class InternalPendingListener implements
- EventuallyConsistentMapListener<Key, IntentData> {
- @Override
- public void event(
- EventuallyConsistentMapEvent<Key, IntentData> event) {
- if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
- // The pending intents map has been updated. If we are master for
- // this intent's partition, notify the Manager that it should do
- // some work.
- if (isMaster(event.value().intent().key())) {
- if (delegate != null) {
- delegate.process(new IntentData(event.value()));
- }
- }
-
- notifyDelegateIfNotNull(IntentEvent.getEvent(event.value()));
- }
- }
- }
-
-}
-
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java
deleted file mode 100644
index f6cd198f..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.intent.impl;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Objects;
-
-/**
- * Identifies a partition of the intent keyspace which will be assigned to and
- * processed by a single ONOS instance at a time.
- */
-public class PartitionId {
- private final int id;
-
- /**
- * Creates a new partition ID.
- *
- * @param id the partition ID
- */
- PartitionId(int id) {
- this.id = id;
- }
-
- /**
- * Returns the integer ID value.
- *
- * @return ID value
- */
- public int value() {
- return id;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof PartitionId)) {
- return false;
- }
-
- PartitionId that = (PartitionId) o;
- return Objects.equals(this.id, that.id);
- }
-
- @Override
- public int hashCode() {
- return id;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("partition ID", id)
- .toString();
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
deleted file mode 100644
index 09108d28..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.intent.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.net.intent.Key;
-import org.onosproject.net.intent.PartitionEvent;
-import org.onosproject.net.intent.PartitionEventListener;
-import org.onosproject.net.intent.PartitionService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-/**
- * Manages the assignment of intent keyspace partitions to instances.
- */
-@Component(immediate = true)
-@Service
-public class PartitionManager implements PartitionService {
-
- private static final Logger log = LoggerFactory.getLogger(PartitionManager.class);
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected EventDeliveryService eventDispatcher;
-
- protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
-
- static final int NUM_PARTITIONS = 14;
- private static final int BACKOFF_TIME = 2;
- private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
- private static final int RETRY_AFTER_DELAY_SEC = 5;
-
- private static final String ELECTION_PREFIX = "intent-partition-";
-
- private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
- private LeadershipEventListener leaderListener = new InternalLeadershipListener();
- private ClusterEventListener clusterListener = new InternalClusterEventListener();
-
- private ScheduledExecutorService executor = Executors
- .newScheduledThreadPool(1);
-
- @Activate
- public void activate() {
- leadershipService.addListener(leaderListener);
- clusterService.addListener(clusterListener);
-
- listenerRegistry = new ListenerRegistry<>();
- eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
-
- for (int i = 0; i < NUM_PARTITIONS; i++) {
- leadershipService.runForLeadership(getPartitionPath(i));
- }
-
- executor.scheduleAtFixedRate(() -> scheduleRebalance(0), 0,
- CHECK_PARTITION_BALANCE_PERIOD_SEC, TimeUnit.SECONDS);
- }
-
- @Deactivate
- public void deactivate() {
- executor.shutdownNow();
-
- eventDispatcher.removeSink(PartitionEvent.class);
- leadershipService.removeListener(leaderListener);
- clusterService.removeListener(clusterListener);
- }
-
- /**
- * Sets the specified executor to be used for scheduling background tasks.
- *
- * @param executor scheduled executor service for background tasks
- * @return this PartitionManager
- */
- public PartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
- this.executor = executor;
- return this;
- }
-
- private String getPartitionPath(int i) {
- return ELECTION_PREFIX + i;
- }
-
- private String getPartitionPath(PartitionId id) {
- return getPartitionPath(id.value());
- }
-
- private PartitionId getPartitionForKey(Key intentKey) {
- int partition = Math.abs((int) intentKey.hash()) % NUM_PARTITIONS;
- //TODO investigate Guava consistent hash method
- // ... does it add significant computational complexity? is it worth it?
- //int partition = consistentHash(intentKey.hash(), NUM_PARTITIONS);
- PartitionId id = new PartitionId(partition);
- return id;
- }
-
- @Override
- public boolean isMine(Key intentKey) {
- return Objects.equals(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
- clusterService.getLocalNode().id());
- }
-
- @Override
- public NodeId getLeader(Key intentKey) {
- return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
- }
-
- @Override
- public void addListener(PartitionEventListener listener) {
- listenerRegistry.addListener(listener);
- }
-
- @Override
- public void removeListener(PartitionEventListener listener) {
- listenerRegistry.removeListener(listener);
- }
-
- protected void doRebalance() {
- rebalanceScheduled.set(false);
- try {
- rebalance();
- } catch (Exception e) {
- log.warn("Exception caught during rebalance task. Will retry in " + RETRY_AFTER_DELAY_SEC + " seconds", e);
- scheduleRebalance(RETRY_AFTER_DELAY_SEC);
- }
- }
-
- /**
- * Determine whether we have more than our fair share of partitions, and if
- * so, relinquish leadership of some of them for a little while to let
- * other instances take over.
- */
- private void rebalance() {
- int activeNodes = (int) clusterService.getNodes()
- .stream()
- .filter(node -> ControllerNode.State.ACTIVE == clusterService.getState(node.id()))
- .count();
-
- int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
-
- List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
- .stream()
- .filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
- .filter(l -> l.topic().startsWith(ELECTION_PREFIX))
- .collect(Collectors.toList());
-
- int relinquish = myPartitions.size() - myShare;
-
- if (relinquish <= 0) {
- return;
- }
-
- for (int i = 0; i < relinquish; i++) {
- String topic = myPartitions.get(i).topic();
- leadershipService.withdraw(topic);
-
- executor.schedule(() -> recontest(topic),
- BACKOFF_TIME, TimeUnit.SECONDS);
- }
- }
-
- private void scheduleRebalance(int afterDelaySec) {
- if (rebalanceScheduled.compareAndSet(false, true)) {
- executor.schedule(this::doRebalance, afterDelaySec, TimeUnit.SECONDS);
- }
- }
-
- /**
- * Try and recontest for leadership of a partition.
- *
- * @param path topic name to recontest
- */
- private void recontest(String path) {
- leadershipService.runForLeadership(path);
- }
-
- private final class InternalLeadershipListener implements LeadershipEventListener {
-
- @Override
- public void event(LeadershipEvent event) {
- Leadership leadership = event.subject();
-
- if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
- leadership.topic().startsWith(ELECTION_PREFIX)) {
-
- // See if we need to let some partitions go
- scheduleRebalance(0);
-
- eventDispatcher.post(new PartitionEvent(PartitionEvent.Type.LEADER_CHANGED,
- leadership.topic()));
- }
- }
- }
-
- private final class InternalClusterEventListener implements
- ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- scheduleRebalance(0);
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/package-info.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/package-info.java
deleted file mode 100644
index a8db8ff2..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of distributed intent store.
- */
-package org.onosproject.store.intent.impl;