diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl')
23 files changed, 0 insertions, 3865 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceAntiEntropyAdvertisement.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceAntiEntropyAdvertisement.java deleted file mode 100644 index 491d1334..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceAntiEntropyAdvertisement.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Map; - -import org.onosproject.cluster.NodeId; -import org.onosproject.net.DeviceId; -import org.onosproject.store.Timestamp; - - -/** - * Device Advertisement message. - */ -public class DeviceAntiEntropyAdvertisement { - - private final NodeId sender; - private final Map<DeviceFragmentId, Timestamp> deviceFingerPrints; - private final Map<PortFragmentId, Timestamp> portFingerPrints; - private final Map<DeviceId, Timestamp> offline; - - - public DeviceAntiEntropyAdvertisement(NodeId sender, - Map<DeviceFragmentId, Timestamp> devices, - Map<PortFragmentId, Timestamp> ports, - Map<DeviceId, Timestamp> offline) { - this.sender = checkNotNull(sender); - this.deviceFingerPrints = checkNotNull(devices); - this.portFingerPrints = checkNotNull(ports); - this.offline = checkNotNull(offline); - } - - public NodeId sender() { - return sender; - } - - public Map<DeviceFragmentId, Timestamp> deviceFingerPrints() { - return deviceFingerPrints; - } - - public Map<PortFragmentId, Timestamp> ports() { - return portFingerPrints; - } - - public Map<DeviceId, Timestamp> offline() { - return offline; - } - - // For serializer - @SuppressWarnings("unused") - private DeviceAntiEntropyAdvertisement() { - this.sender = null; - this.deviceFingerPrints = null; - this.portFingerPrints = null; - this.offline = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceAntiEntropyRequest.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceAntiEntropyRequest.java deleted file mode 100644 index a719a770..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceAntiEntropyRequest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collection; - -import org.onosproject.cluster.NodeId; - -/** - * Message to request for other peers information. - */ -public class DeviceAntiEntropyRequest { - - private final NodeId sender; - private final Collection<DeviceFragmentId> devices; - private final Collection<PortFragmentId> ports; - - public DeviceAntiEntropyRequest(NodeId sender, - Collection<DeviceFragmentId> devices, - Collection<PortFragmentId> ports) { - - this.sender = checkNotNull(sender); - this.devices = checkNotNull(devices); - this.ports = checkNotNull(ports); - } - - public NodeId sender() { - return sender; - } - - public Collection<DeviceFragmentId> devices() { - return devices; - } - - public Collection<PortFragmentId> ports() { - return ports; - } - - // For serializer - @SuppressWarnings("unused") - private DeviceAntiEntropyRequest() { - this.sender = null; - this.devices = null; - this.ports = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceClockManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceClockManager.java deleted file mode 100644 index da5bd5de..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceClockManager.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.NodeId; -import org.onosproject.mastership.MastershipTerm; -import org.onosproject.mastership.MastershipTermService; -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.DeviceClockService; -import org.onosproject.store.Timestamp; -import org.onosproject.store.impl.MastershipBasedTimestamp; -import org.slf4j.Logger; - -/** - * Clock service to issue Timestamp based on Device Mastership. - */ -@Component(immediate = true) -@Service -public class DeviceClockManager implements DeviceClockService { - - private final Logger log = getLogger(getClass()); - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipTermService mastershipTermService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - protected NodeId localNodeId; - - private final AtomicLong ticker = new AtomicLong(0); - - @Activate - public void activate() { - localNodeId = clusterService.getLocalNode().id(); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - log.info("Stopped"); - } - - @Override - public Timestamp getTimestamp(DeviceId deviceId) { - MastershipTerm term = mastershipTermService.getMastershipTerm(deviceId); - if (term == null || !localNodeId.equals(term.master())) { - throw new IllegalStateException("Requesting timestamp for " + deviceId + " without mastership"); - } - return new MastershipBasedTimestamp(term.termNumber(), ticker.incrementAndGet()); - } - - @Override - public boolean isTimestampAvailable(DeviceId deviceId) { - MastershipTerm term = mastershipTermService.getMastershipTerm(deviceId); - return term != null && localNodeId.equals(term.master()); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceDescriptions.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceDescriptions.java deleted file mode 100644 index 23206725..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceDescriptions.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.onosproject.net.DefaultAnnotations.union; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.onosproject.net.PortNumber; -import org.onosproject.net.SparseAnnotations; -import org.onosproject.net.device.DefaultDeviceDescription; -import org.onosproject.net.device.DefaultPortDescription; -import org.onosproject.net.device.DeviceDescription; -import org.onosproject.net.device.OchPortDescription; -import org.onosproject.net.device.OduCltPortDescription; -import org.onosproject.net.device.OmsPortDescription; -import org.onosproject.net.device.PortDescription; -import org.onosproject.store.Timestamp; -import org.onosproject.store.impl.Timestamped; - -/* - * Collection of Description of a Device and Ports, given from a Provider. - */ -class DeviceDescriptions { - - private volatile Timestamped<DeviceDescription> deviceDesc; - - private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs; - - public DeviceDescriptions(Timestamped<DeviceDescription> desc) { - this.deviceDesc = checkNotNull(desc); - this.portDescs = new ConcurrentHashMap<>(); - } - - public Timestamp getLatestTimestamp() { - Timestamp latest = deviceDesc.timestamp(); - for (Timestamped<PortDescription> desc : portDescs.values()) { - if (desc.timestamp().compareTo(latest) > 0) { - latest = desc.timestamp(); - } - } - return latest; - } - - public Timestamped<DeviceDescription> getDeviceDesc() { - return deviceDesc; - } - - public Timestamped<PortDescription> getPortDesc(PortNumber number) { - return portDescs.get(number); - } - - public Map<PortNumber, Timestamped<PortDescription>> getPortDescs() { - return Collections.unmodifiableMap(portDescs); - } - - /** - * Puts DeviceDescription, merging annotations as necessary. - * - * @param newDesc new DeviceDescription - */ - public void putDeviceDesc(Timestamped<DeviceDescription> newDesc) { - Timestamped<DeviceDescription> oldOne = deviceDesc; - Timestamped<DeviceDescription> newOne = newDesc; - if (oldOne != null) { - SparseAnnotations merged = union(oldOne.value().annotations(), - newDesc.value().annotations()); - newOne = new Timestamped<>( - new DefaultDeviceDescription(newDesc.value(), merged), - newDesc.timestamp()); - } - deviceDesc = newOne; - } - - /** - * Puts PortDescription, merging annotations as necessary. - * - * @param newDesc new PortDescription - */ - public void putPortDesc(Timestamped<PortDescription> newDesc) { - Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber()); - Timestamped<PortDescription> newOne = newDesc; - if (oldOne != null) { - SparseAnnotations merged = union(oldOne.value().annotations(), - newDesc.value().annotations()); - newOne = null; - switch (newDesc.value().type()) { - case OMS: - OmsPortDescription omsDesc = (OmsPortDescription) (newDesc.value()); - newOne = new Timestamped<>( - new OmsPortDescription( - omsDesc, omsDesc.minFrequency(), omsDesc.maxFrequency(), omsDesc.grid(), merged), - newDesc.timestamp()); - break; - case OCH: - OchPortDescription ochDesc = (OchPortDescription) (newDesc.value()); - newOne = new Timestamped<>( - new OchPortDescription( - ochDesc, ochDesc.signalType(), ochDesc.isTunable(), ochDesc.lambda(), merged), - newDesc.timestamp()); - break; - case ODUCLT: - OduCltPortDescription ocDesc = (OduCltPortDescription) (newDesc.value()); - newOne = new Timestamped<>( - new OduCltPortDescription( - ocDesc, ocDesc.signalType(), merged), - newDesc.timestamp()); - break; - default: - newOne = new Timestamped<>( - new DefaultPortDescription(newDesc.value(), merged), - newDesc.timestamp()); - } - } - portDescs.put(newOne.value().portNumber(), newOne); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceFragmentId.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceFragmentId.java deleted file mode 100644 index 214f4c23..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceFragmentId.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import java.util.Objects; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.provider.ProviderId; - -import com.google.common.base.MoreObjects; - -/** - * Identifier for DeviceDesctiption from a Provider. - */ -public final class DeviceFragmentId { - public final ProviderId providerId; - public final DeviceId deviceId; - - public DeviceFragmentId(DeviceId deviceId, ProviderId providerId) { - this.providerId = providerId; - this.deviceId = deviceId; - } - - @Override - public int hashCode() { - return Objects.hash(providerId, deviceId); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof DeviceFragmentId)) { - return false; - } - DeviceFragmentId that = (DeviceFragmentId) obj; - return Objects.equals(this.deviceId, that.deviceId) && - Objects.equals(this.providerId, that.providerId); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .toString(); - } - - // for serializer - @SuppressWarnings("unused") - private DeviceFragmentId() { - this.providerId = null; - this.deviceId = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java deleted file mode 100644 index 3c3bbb65..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import com.google.common.base.MoreObjects; -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.DeviceDescription; -import org.onosproject.net.provider.ProviderId; - -public class DeviceInjectedEvent { - private final ProviderId providerId; - private final DeviceId deviceId; - private final DeviceDescription deviceDescription; - - protected DeviceInjectedEvent( - ProviderId providerId, - DeviceId deviceId, - DeviceDescription deviceDescription) { - this.providerId = providerId; - this.deviceId = deviceId; - this.deviceDescription = deviceDescription; - } - - public DeviceId deviceId() { - return deviceId; - } - - public ProviderId providerId() { - return providerId; - } - - public DeviceDescription deviceDescription() { - return deviceDescription; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .add("deviceDescription", deviceDescription) - .toString(); - } - - // for serializer - protected DeviceInjectedEvent() { - this.providerId = null; - this.deviceId = null; - this.deviceDescription = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceKey.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceKey.java deleted file mode 100644 index 0896bf18..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceKey.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import java.util.Objects; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.provider.ProviderId; - -import com.google.common.base.MoreObjects; - -/** - * Key for DeviceDescriptions in ECDeviceStore. - */ -public class DeviceKey { - private final ProviderId providerId; - private final DeviceId deviceId; - - public DeviceKey(ProviderId providerId, DeviceId deviceId) { - this.providerId = providerId; - this.deviceId = deviceId; - } - - public ProviderId providerId() { - return providerId; - } - - public DeviceId deviceId() { - return deviceId; - } - - @Override - public int hashCode() { - return Objects.hash(providerId, deviceId); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof DeviceKey)) { - return false; - } - DeviceKey that = (DeviceKey) obj; - return Objects.equals(this.deviceId, that.deviceId) && - Objects.equals(this.providerId, that.providerId); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .toString(); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java deleted file mode 100644 index 2dae55bb..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java +++ /dev/null @@ -1,784 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Verify.verify; -import static org.onosproject.net.DefaultAnnotations.merge; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.packet.ChassisId; -import org.onlab.util.KryoNamespace; -import org.onlab.util.SharedExecutors; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.NodeId; -import org.onosproject.mastership.MastershipService; -import org.onosproject.mastership.MastershipTermService; -import org.onosproject.net.Annotations; -import org.onosproject.net.AnnotationsUtil; -import org.onosproject.net.DefaultAnnotations; -import org.onosproject.net.DefaultDevice; -import org.onosproject.net.DefaultPort; -import org.onosproject.net.Device; -import org.onosproject.net.DeviceId; -import org.onosproject.net.MastershipRole; -import org.onosproject.net.OchPort; -import org.onosproject.net.OduCltPort; -import org.onosproject.net.OmsPort; -import org.onosproject.net.Port; -import org.onosproject.net.PortNumber; -import org.onosproject.net.Device.Type; -import org.onosproject.net.device.DefaultPortStatistics; -import org.onosproject.net.device.DeviceClockService; -import org.onosproject.net.device.DeviceDescription; -import org.onosproject.net.device.DeviceEvent; -import org.onosproject.net.device.DeviceStore; -import org.onosproject.net.device.DeviceStoreDelegate; -import org.onosproject.net.device.OchPortDescription; -import org.onosproject.net.device.OduCltPortDescription; -import org.onosproject.net.device.OmsPortDescription; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.device.PortStatistics; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.AbstractStore; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.impl.MastershipBasedTimestamp; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.serializers.custom.DistributedStoreSerializers; -import org.onosproject.store.service.DistributedSet; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.SetEvent; -import org.onosproject.store.service.SetEventListener; -import org.onosproject.store.service.WallClockTimestamp; - -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; - -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.StorageService; -import org.slf4j.Logger; - -import static org.onosproject.net.device.DeviceEvent.Type.*; -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED; -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ; -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; - -/** - * Manages the inventory of devices using a {@code EventuallyConsistentMap}. - */ -@Component(immediate = true, enabled = false) -@Service -public class ECDeviceStore - extends AbstractStore<DeviceEvent, DeviceStoreDelegate> - implements DeviceStore { - - private final Logger log = getLogger(getClass()); - - private static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; - - private final Map<DeviceId, Device> devices = Maps.newConcurrentMap(); - private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap(); - Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet(); - - private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions; - private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions; - private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats; - private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats; - - private DistributedSet<DeviceId> availableDevices; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipService mastershipService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipTermService mastershipTermService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected DeviceClockService deviceClockService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - private NodeId localNodeId; - private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener = - new InternalDeviceChangeEventListener(); - private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener = - new InternalPortChangeEventListener(); - private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener = - new InternalPortStatsListener(); - private final SetEventListener<DeviceId> deviceStatusTracker = - new InternalDeviceStatusTracker(); - - protected static final KryoSerializer SERIALIZER = new KryoSerializer() { - @Override - protected void setupKryoPool() { - serializerPool = KryoNamespace.newBuilder() - .register(DistributedStoreSerializers.STORE_COMMON) - .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) - .register(DeviceInjectedEvent.class) - .register(PortInjectedEvent.class) - .build(); - } - }; - - protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder() - .register(KryoNamespaces.API) - .register(DeviceKey.class) - .register(PortKey.class) - .register(DeviceKey.class) - .register(PortKey.class) - .register(MastershipBasedTimestamp.class); - - @Activate - public void activate() { - localNodeId = clusterService.getLocalNode().id(); - - deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder() - .withName("onos-device-descriptions") - .withSerializer(SERIALIZER_BUILDER) - .withTimestampProvider((k, v) -> { - try { - return deviceClockService.getTimestamp(k.deviceId()); - } catch (IllegalStateException e) { - return null; - } - }).build(); - - portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder() - .withName("onos-port-descriptions") - .withSerializer(SERIALIZER_BUILDER) - .withTimestampProvider((k, v) -> { - try { - return deviceClockService.getTimestamp(k.deviceId()); - } catch (IllegalStateException e) { - return null; - } - }).build(); - - devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder() - .withName("onos-port-stats") - .withSerializer(SERIALIZER_BUILDER) - .withAntiEntropyPeriod(5, TimeUnit.SECONDS) - .withTimestampProvider((k, v) -> new WallClockTimestamp()) - .withTombstonesDisabled() - .build(); - - devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>> - eventuallyConsistentMapBuilder() - .withName("onos-port-stats-delta") - .withSerializer(SERIALIZER_BUILDER) - .withAntiEntropyPeriod(5, TimeUnit.SECONDS) - .withTimestampProvider((k, v) -> new WallClockTimestamp()) - .withTombstonesDisabled() - .build(); - - clusterCommunicator.addSubscriber(DEVICE_INJECTED, - SERIALIZER::decode, - this::injectDevice, - SERIALIZER::encode, - SharedExecutors.getPoolThreadExecutor()); - - clusterCommunicator.addSubscriber(PORT_INJECTED, - SERIALIZER::decode, - this::injectPort, - SERIALIZER::encode, - SharedExecutors.getPoolThreadExecutor()); - - availableDevices = storageService.<DeviceId>setBuilder() - .withName("onos-online-devices") - .withSerializer(Serializer.using(KryoNamespaces.API)) - .withPartitionsDisabled() - .withRelaxedReadConsistency() - .build(); - - deviceDescriptions.addListener(deviceUpdateListener); - portDescriptions.addListener(portUpdateListener); - devicePortStats.addListener(portStatsListener); - availableDevices.addListener(deviceStatusTracker); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - devicePortStats.removeListener(portStatsListener); - deviceDescriptions.removeListener(deviceUpdateListener); - portDescriptions.removeListener(portUpdateListener); - availableDevices.removeListener(deviceStatusTracker); - devicePortStats.destroy(); - devicePortDeltaStats.destroy(); - deviceDescriptions.destroy(); - portDescriptions.destroy(); - devices.clear(); - devicePorts.clear(); - clusterCommunicator.removeSubscriber(DEVICE_INJECTED); - clusterCommunicator.removeSubscriber(PORT_INJECTED); - log.info("Stopped"); - } - - @Override - public Iterable<Device> getDevices() { - return devices.values(); - } - - @Override - public int getDeviceCount() { - return devices.size(); - } - - @Override - public Device getDevice(DeviceId deviceId) { - return devices.get(deviceId); - } - - @Override - public DeviceEvent createOrUpdateDevice(ProviderId providerId, - DeviceId deviceId, - DeviceDescription deviceDescription) { - NodeId master = mastershipService.getMasterFor(deviceId); - if (localNodeId.equals(master)) { - deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription); - return refreshDeviceCache(providerId, deviceId); - } else { - DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription); - return Futures.getUnchecked( - clusterCommunicator.sendAndReceive(deviceInjectedEvent, - DEVICE_INJECTED, - SERIALIZER::encode, - SERIALIZER::decode, - master)); - } - } - - private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) { - AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>(); - Device device = devices.compute(deviceId, (k, existingDevice) -> { - Device newDevice = composeDevice(deviceId); - if (existingDevice == null) { - eventType.set(DEVICE_ADDED); - } else { - // We allow only certain attributes to trigger update - boolean propertiesChanged = - !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) || - !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) || - !Objects.equals(existingDevice.providerId(), newDevice.providerId()); - boolean annotationsChanged = - !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations()); - - // Primary providers can respond to all changes, but ancillary ones - // should respond only to annotation changes. - if ((providerId.isAncillary() && annotationsChanged) || - (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) { - boolean replaced = devices.replace(deviceId, existingDevice, newDevice); - verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", - providerId, existingDevice, devices.get(deviceId), newDevice); - eventType.set(DEVICE_UPDATED); - } - } - return newDevice; - }); - if (eventType.get() != null && !providerId.isAncillary()) { - markOnline(deviceId); - } - return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null; - } - - /** - * Returns the primary providerId for a device. - * @param deviceId device identifier - * @return primary providerId - */ - private Set<ProviderId> getAllProviders(DeviceId deviceId) { - return deviceDescriptions.keySet() - .stream() - .filter(deviceKey -> deviceKey.deviceId().equals(deviceId)) - .map(deviceKey -> deviceKey.providerId()) - .collect(Collectors.toSet()); - } - - /** - * Returns the identifier for all providers for a device. - * @param deviceId device identifier - * @return set of provider identifiers - */ - private ProviderId getPrimaryProviderId(DeviceId deviceId) { - Set<ProviderId> allProviderIds = getAllProviders(deviceId); - return allProviderIds.stream() - .filter(p -> !p.isAncillary()) - .findFirst() - .orElse(Iterables.getFirst(allProviderIds, null)); - } - - /** - * Returns a Device, merging descriptions from multiple Providers. - * - * @param deviceId device identifier - * @return Device instance - */ - private Device composeDevice(DeviceId deviceId) { - - ProviderId primaryProviderId = getPrimaryProviderId(deviceId); - DeviceDescription primaryDeviceDescription = - deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId)); - - Type type = primaryDeviceDescription.type(); - String manufacturer = primaryDeviceDescription.manufacturer(); - String hwVersion = primaryDeviceDescription.hwVersion(); - String swVersion = primaryDeviceDescription.swVersion(); - String serialNumber = primaryDeviceDescription.serialNumber(); - ChassisId chassisId = primaryDeviceDescription.chassisId(); - DefaultAnnotations annotations = mergeAnnotations(deviceId); - - return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer, - hwVersion, swVersion, serialNumber, - chassisId, annotations); - } - - private DeviceEvent purgeDeviceCache(DeviceId deviceId) { - Device removedDevice = devices.remove(deviceId); - if (removedDevice != null) { - getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId))); - return new DeviceEvent(DEVICE_REMOVED, removedDevice); - } - return null; - } - - private boolean markOnline(DeviceId deviceId) { - return availableDevices.add(deviceId); - } - - @Override - public DeviceEvent markOffline(DeviceId deviceId) { - availableDevices.remove(deviceId); - // set update listener will raise the event. - return null; - } - - @Override - public List<DeviceEvent> updatePorts(ProviderId providerId, - DeviceId deviceId, - List<PortDescription> descriptions) { - NodeId master = mastershipService.getMasterFor(deviceId); - List<DeviceEvent> deviceEvents = null; - if (localNodeId.equals(master)) { - descriptions.forEach(description -> { - PortKey portKey = new PortKey(providerId, deviceId, description.portNumber()); - portDescriptions.put(portKey, description); - }); - deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty()); - } else { - if (master == null) { - return Collections.emptyList(); - } - PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions); - deviceEvents = Futures.getUnchecked( - clusterCommunicator.sendAndReceive(portInjectedEvent, - PORT_INJECTED, - SERIALIZER::encode, - SERIALIZER::decode, - master)); - } - return deviceEvents == null ? Collections.emptyList() : deviceEvents; - } - - private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId, - DeviceId deviceId, - Optional<PortNumber> portNumber) { - Device device = devices.get(deviceId); - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); - List<DeviceEvent> events = Lists.newArrayList(); - - Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap()); - List<PortDescription> descriptions = Lists.newArrayList(); - portDescriptions.entrySet().forEach(e -> { - PortKey key = e.getKey(); - PortDescription value = e.getValue(); - if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) { - if (portNumber.isPresent()) { - if (portNumber.get().equals(key.portNumber())) { - descriptions.add(value); - } - } else { - descriptions.add(value); - } - } - }); - - for (PortDescription description : descriptions) { - final PortNumber number = description.portNumber(); - ports.compute(number, (k, existingPort) -> { - Port newPort = composePort(device, number); - if (existingPort == null) { - events.add(new DeviceEvent(PORT_ADDED, device, newPort)); - } else { - if (existingPort.isEnabled() != newPort.isEnabled() || - existingPort.type() != newPort.type() || - existingPort.portSpeed() != newPort.portSpeed() || - !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) { - events.add(new DeviceEvent(PORT_UPDATED, device, newPort)); - } - } - return newPort; - }); - } - - return events; - } - - /** - * Returns a Port, merging descriptions from multiple Providers. - * - * @param device device the port is on - * @param number port number - * @return Port instance - */ - private Port composePort(Device device, PortNumber number) { - - Map<ProviderId, PortDescription> descriptions = Maps.newHashMap(); - portDescriptions.entrySet().forEach(entry -> { - PortKey portKey = entry.getKey(); - if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) { - descriptions.put(portKey.providerId(), entry.getValue()); - } - }); - ProviderId primary = getPrimaryProviderId(device.id()); - PortDescription primaryDescription = descriptions.get(primary); - - // if no primary, assume not enabled - boolean isEnabled = false; - DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - if (primaryDescription != null) { - isEnabled = primaryDescription.isEnabled(); - annotations = merge(annotations, primaryDescription.annotations()); - } - Port updated = null; - for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) { - if (e.getKey().equals(primary)) { - continue; - } - annotations = merge(annotations, e.getValue().annotations()); - updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations); - } - if (primaryDescription == null) { - return updated == null ? new DefaultPort(device, number, false, annotations) : updated; - } - return updated == null - ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations) - : updated; - } - - private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled, - PortDescription description, Annotations annotations) { - switch (description.type()) { - case OMS: - OmsPortDescription omsDesc = (OmsPortDescription) description; - return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(), - omsDesc.maxFrequency(), omsDesc.grid(), annotations); - case OCH: - OchPortDescription ochDesc = (OchPortDescription) description; - return new OchPort(device, number, isEnabled, ochDesc.signalType(), - ochDesc.isTunable(), ochDesc.lambda(), annotations); - case ODUCLT: - OduCltPortDescription oduDesc = (OduCltPortDescription) description; - return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations); - default: - return new DefaultPort(device, number, isEnabled, description.type(), - description.portSpeed(), annotations); - } - } - - @Override - public DeviceEvent updatePortStatus(ProviderId providerId, - DeviceId deviceId, - PortDescription portDescription) { - portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription); - List<DeviceEvent> events = - refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber())); - return Iterables.getFirst(events, null); - } - - @Override - public List<Port> getPorts(DeviceId deviceId) { - return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values()); - } - - @Override - public Port getPort(DeviceId deviceId, PortNumber portNumber) { - return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber); - } - - @Override - public DeviceEvent updatePortStatistics(ProviderId providerId, - DeviceId deviceId, - Collection<PortStatistics> newStatsCollection) { - - Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId); - Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap(); - Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap(); - - if (prvStatsMap != null) { - for (PortStatistics newStats : newStatsCollection) { - PortNumber port = PortNumber.portNumber(newStats.port()); - PortStatistics prvStats = prvStatsMap.get(port); - DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); - PortStatistics deltaStats = builder.build(); - if (prvStats != null) { - deltaStats = calcDeltaStats(deviceId, prvStats, newStats); - } - deltaStatsMap.put(port, deltaStats); - newStatsMap.put(port, newStats); - } - } else { - for (PortStatistics newStats : newStatsCollection) { - PortNumber port = PortNumber.portNumber(newStats.port()); - newStatsMap.put(port, newStats); - } - } - devicePortDeltaStats.put(deviceId, deltaStatsMap); - devicePortStats.put(deviceId, newStatsMap); - // DeviceEvent returns null because of InternalPortStatsListener usage - return null; - } - - /** - * Calculate delta statistics by subtracting previous from new statistics. - * - * @param deviceId device indentifier - * @param prvStats previous port statistics - * @param newStats new port statistics - * @return PortStatistics - */ - public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) { - // calculate time difference - long deltaStatsSec, deltaStatsNano; - if (newStats.durationNano() < prvStats.durationNano()) { - deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1); - deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L; - } else { - deltaStatsNano = newStats.durationNano() - prvStats.durationNano(); - deltaStatsSec = newStats.durationSec() - prvStats.durationSec(); - } - DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); - DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId) - .setPort(newStats.port()) - .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived()) - .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent()) - .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived()) - .setBytesSent(newStats.bytesSent() - prvStats.bytesSent()) - .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped()) - .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped()) - .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors()) - .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors()) - .setDurationSec(deltaStatsSec) - .setDurationNano(deltaStatsNano) - .build(); - return deltaStats; - } - - @Override - public List<PortStatistics> getPortStatistics(DeviceId deviceId) { - Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId); - if (portStats == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(portStats.values()); - } - - @Override - public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) { - Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId); - if (portStats == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(portStats.values()); - } - - @Override - public boolean isAvailable(DeviceId deviceId) { - return availableDevices.contains(deviceId); - } - - @Override - public Iterable<Device> getAvailableDevices() { - return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null); - } - - @Override - public DeviceEvent removeDevice(DeviceId deviceId) { - NodeId master = mastershipService.getMasterFor(deviceId); - // if there exist a master, forward - // if there is no master, try to become one and process - boolean relinquishAtEnd = false; - if (master == null) { - final MastershipRole myRole = mastershipService.getLocalRole(deviceId); - if (myRole != MastershipRole.NONE) { - relinquishAtEnd = true; - } - log.debug("Temporarily requesting role for {} to remove", deviceId); - MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId)); - if (role == MastershipRole.MASTER) { - master = localNodeId; - } - } - - if (!localNodeId.equals(master)) { - log.debug("{} has control of {}, forwarding remove request", - master, deviceId); - - clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master) - .whenComplete((r, e) -> { - if (e != null) { - log.error("Failed to forward {} remove request to its master", deviceId, e); - } - }); - return null; - } - - // I have control.. - DeviceEvent event = null; - final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId); - DeviceDescription removedDeviceDescription = - deviceDescriptions.remove(deviceKey); - if (removedDeviceDescription != null) { - event = purgeDeviceCache(deviceId); - } - - if (relinquishAtEnd) { - log.debug("Relinquishing temporary role acquired for {}", deviceId); - mastershipService.relinquishMastership(deviceId); - } - return event; - } - - private DeviceEvent injectDevice(DeviceInjectedEvent event) { - return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription()); - } - - private List<DeviceEvent> injectPort(PortInjectedEvent event) { - return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions()); - } - - private DefaultAnnotations mergeAnnotations(DeviceId deviceId) { - ProviderId primaryProviderId = getPrimaryProviderId(deviceId); - DeviceDescription primaryDeviceDescription = - deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId)); - DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - annotations = merge(annotations, primaryDeviceDescription.annotations()); - for (ProviderId providerId : getAllProviders(deviceId)) { - if (!providerId.equals(primaryProviderId)) { - annotations = merge(annotations, - deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations()); - } - } - return annotations; - } - - private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> { - @Override - public void event(SetEvent<DeviceId> event) { - final DeviceId deviceId = event.entry(); - final Device device = devices.get(deviceId); - if (device != null) { - notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device)); - } else { - pendingAvailableChangeUpdates.add(deviceId); - } - } - } - - private class InternalDeviceChangeEventListener - implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> { - @Override - public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) { - DeviceId deviceId = event.key().deviceId(); - ProviderId providerId = event.key().providerId(); - if (event.type() == PUT) { - notifyDelegate(refreshDeviceCache(providerId, deviceId)); - if (pendingAvailableChangeUpdates.remove(deviceId)) { - notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId))); - } - } else if (event.type() == REMOVE) { - notifyDelegate(purgeDeviceCache(deviceId)); - } - } - } - - private class InternalPortChangeEventListener - implements EventuallyConsistentMapListener<PortKey, PortDescription> { - @Override - public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) { - DeviceId deviceId = event.key().deviceId(); - ProviderId providerId = event.key().providerId(); - PortNumber portNumber = event.key().portNumber(); - if (event.type() == PUT) { - if (devices.containsKey(deviceId)) { - List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber)); - for (DeviceEvent deviceEvent : events) { - notifyDelegate(deviceEvent); - } - } - } else if (event.type() == REMOVE) { - log.warn("Unexpected port removed event"); - } - } - } - - private class InternalPortStatsListener - implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> { - @Override - public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) { - if (event.type() == PUT) { - Device device = devices.get(event.key()); - if (device != null) { - delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device)); - } - } - } - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java deleted file mode 100644 index a9a9098e..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java +++ /dev/null @@ -1,1672 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.apache.commons.lang3.RandomUtils; -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.packet.ChassisId; -import org.onlab.util.KryoNamespace; -import org.onlab.util.NewConcurrentHashMap; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.NodeId; -import org.onosproject.mastership.MastershipService; -import org.onosproject.mastership.MastershipTerm; -import org.onosproject.mastership.MastershipTermService; -import org.onosproject.net.Annotations; -import org.onosproject.net.AnnotationsUtil; -import org.onosproject.net.DefaultAnnotations; -import org.onosproject.net.DefaultDevice; -import org.onosproject.net.DefaultPort; -import org.onosproject.net.Device; -import org.onosproject.net.Device.Type; -import org.onosproject.net.DeviceId; -import org.onosproject.net.MastershipRole; -import org.onosproject.net.OchPort; -import org.onosproject.net.OduCltPort; -import org.onosproject.net.OmsPort; -import org.onosproject.net.Port; -import org.onosproject.net.PortNumber; -import org.onosproject.net.device.DefaultPortStatistics; -import org.onosproject.net.device.DeviceClockService; -import org.onosproject.net.device.DeviceDescription; -import org.onosproject.net.device.DeviceEvent; -import org.onosproject.net.device.DeviceStore; -import org.onosproject.net.device.DeviceStoreDelegate; -import org.onosproject.net.device.OchPortDescription; -import org.onosproject.net.device.OduCltPortDescription; -import org.onosproject.net.device.OmsPortDescription; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.device.PortStatistics; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.AbstractStore; -import org.onosproject.store.Timestamp; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.ClusterMessage; -import org.onosproject.store.cluster.messaging.ClusterMessageHandler; -import org.onosproject.store.cluster.messaging.MessageSubject; -import org.onosproject.store.impl.Timestamped; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.serializers.custom.DistributedStoreSerializers; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.MultiValuedTimestamp; -import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.WallClockTimestamp; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Predicates.notNull; -import static com.google.common.base.Verify.verify; -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; -import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; -import static org.onlab.util.Tools.groupedThreads; -import static org.onlab.util.Tools.minPriority; -import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId; -import static org.onosproject.net.DefaultAnnotations.merge; -import static org.onosproject.net.device.DeviceEvent.Type.*; -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.slf4j.LoggerFactory.getLogger; - -/** - * Manages inventory of infrastructure devices using gossip protocol to distribute - * information. - */ -@Component(immediate = true) -@Service -public class GossipDeviceStore - extends AbstractStore<DeviceEvent, DeviceStoreDelegate> - implements DeviceStore { - - private final Logger log = getLogger(getClass()); - - private static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; - // Timeout in milliseconds to process device or ports on remote master node - private static final int REMOTE_MASTER_TIMEOUT = 1000; - - // innerMap is used to lock a Device, thus instance should never be replaced. - // collection of Description given from various providers - private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>> - deviceDescs = Maps.newConcurrentMap(); - - // cache of Device and Ports generated by compositing descriptions from providers - private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap(); - private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap(); - - private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats; - private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats; - private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> - portStatsListener = new InternalPortStatsListener(); - - // to be updated under Device lock - private final Map<DeviceId, Timestamp> offline = Maps.newHashMap(); - private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap(); - - // available(=UP) devices - private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet(); - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected DeviceClockService deviceClockService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipService mastershipService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipTermService termService; - - - protected static final KryoSerializer SERIALIZER = new KryoSerializer() { - @Override - protected void setupKryoPool() { - serializerPool = KryoNamespace.newBuilder() - .register(DistributedStoreSerializers.STORE_COMMON) - .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) - .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class) - .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class) - .register(InternalDeviceRemovedEvent.class) - .register(new InternalPortEventSerializer(), InternalPortEvent.class) - .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class) - .register(DeviceAntiEntropyAdvertisement.class) - .register(DeviceFragmentId.class) - .register(PortFragmentId.class) - .register(DeviceInjectedEvent.class) - .register(PortInjectedEvent.class) - .build(); - } - }; - - private ExecutorService executor; - - private ScheduledExecutorService backgroundExecutor; - - // TODO make these anti-entropy parameters configurable - private long initialDelaySec = 5; - private long periodSec = 5; - - @Activate - public void activate() { - executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d")); - - backgroundExecutor = - newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d"))); - - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, - new InternalDeviceOfflineEventListener(), - executor); - clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, - new InternalRemoveRequestListener(), - executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, - new InternalDeviceAdvertisementListener(), - backgroundExecutor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor); - - // start anti-entropy thread - backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), - initialDelaySec, periodSec, TimeUnit.SECONDS); - - // Create a distributed map for port stats. - KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder() - .register(KryoNamespaces.API) - .register(DefaultPortStatistics.class) - .register(DeviceId.class) - .register(MultiValuedTimestamp.class) - .register(WallClockTimestamp.class); - - devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder() - .withName("port-stats") - .withSerializer(deviceDataSerializer) - .withAntiEntropyPeriod(5, TimeUnit.SECONDS) - .withTimestampProvider((k, v) -> new WallClockTimestamp()) - .withTombstonesDisabled() - .build(); - devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>> - eventuallyConsistentMapBuilder() - .withName("port-stats-delta") - .withSerializer(deviceDataSerializer) - .withAntiEntropyPeriod(5, TimeUnit.SECONDS) - .withTimestampProvider((k, v) -> new WallClockTimestamp()) - .withTombstonesDisabled() - .build(); - devicePortStats.addListener(portStatsListener); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - devicePortStats.destroy(); - devicePortDeltaStats.destroy(); - executor.shutdownNow(); - - backgroundExecutor.shutdownNow(); - try { - if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - log.error("Timeout during executor shutdown"); - } - } catch (InterruptedException e) { - log.error("Error during executor shutdown", e); - } - - deviceDescs.clear(); - devices.clear(); - devicePorts.clear(); - availableDevices.clear(); - log.info("Stopped"); - } - - @Override - public int getDeviceCount() { - return devices.size(); - } - - @Override - public Iterable<Device> getDevices() { - return Collections.unmodifiableCollection(devices.values()); - } - - @Override - public Iterable<Device> getAvailableDevices() { - return FluentIterable.from(getDevices()) - .filter(input -> isAvailable(input.id())); - } - - @Override - public Device getDevice(DeviceId deviceId) { - return devices.get(deviceId); - } - - @Override - public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, - DeviceId deviceId, - DeviceDescription deviceDescription) { - NodeId localNode = clusterService.getLocalNode().id(); - NodeId deviceNode = mastershipService.getMasterFor(deviceId); - - // Process device update only if we're the master, - // otherwise signal the actual master. - DeviceEvent deviceEvent = null; - if (localNode.equals(deviceNode)) { - - final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId); - final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); - final Timestamped<DeviceDescription> mergedDesc; - final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); - - synchronized (device) { - deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc); - mergedDesc = device.get(providerId).getDeviceDesc(); - } - - if (deviceEvent != null) { - log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); - notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc)); - } - - } else { - // FIXME Temporary hack for NPE (ONOS-1171). - // Proper fix is to implement forwarding to master on ConfigProvider - // redo ONOS-490 - if (deviceNode == null) { - // silently ignore - return null; - } - - - DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent( - providerId, deviceId, deviceDescription); - - // TODO check unicast return value - clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode); - /* error log: - log.warn("Failed to process injected device id: {} desc: {} " + - "(cluster messaging failed: {})", - deviceId, deviceDescription, e); - */ - } - - return deviceEvent; - } - - private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, - DeviceId deviceId, - Timestamped<DeviceDescription> deltaDesc) { - - // Collection of DeviceDescriptions for a Device - Map<ProviderId, DeviceDescriptions> device - = getOrCreateDeviceDescriptionsMap(deviceId); - - synchronized (device) { - // locking per device - - if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) { - log.debug("Ignoring outdated event: {}", deltaDesc); - return null; - } - - DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc); - - final Device oldDevice = devices.get(deviceId); - final Device newDevice; - - if (deltaDesc == descs.getDeviceDesc() || - deltaDesc.isNewer(descs.getDeviceDesc())) { - // on new device or valid update - descs.putDeviceDesc(deltaDesc); - newDevice = composeDevice(deviceId, device); - } else { - // outdated event, ignored. - return null; - } - if (oldDevice == null) { - // ADD - return createDevice(providerId, newDevice, deltaDesc.timestamp()); - } else { - // UPDATE or ignore (no change or stale) - return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp()); - } - } - } - - // Creates the device and returns the appropriate event if necessary. - // Guarded by deviceDescs value (=Device lock) - private DeviceEvent createDevice(ProviderId providerId, - Device newDevice, Timestamp timestamp) { - - // update composed device cache - Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); - verify(oldDevice == null, - "Unexpected Device in cache. PID:%s [old=%s, new=%s]", - providerId, oldDevice, newDevice); - - if (!providerId.isAncillary()) { - markOnline(newDevice.id(), timestamp); - } - - return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); - } - - // Updates the device and returns the appropriate event if necessary. - // Guarded by deviceDescs value (=Device lock) - private DeviceEvent updateDevice(ProviderId providerId, - Device oldDevice, - Device newDevice, Timestamp newTimestamp) { - // We allow only certain attributes to trigger update - boolean propertiesChanged = - !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) || - !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) || - !Objects.equals(oldDevice.providerId(), newDevice.providerId()); - boolean annotationsChanged = - !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations()); - - // Primary providers can respond to all changes, but ancillary ones - // should respond only to annotation changes. - DeviceEvent event = null; - if ((providerId.isAncillary() && annotationsChanged) || - (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) { - boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice); - if (!replaced) { - verify(replaced, - "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", - providerId, oldDevice, devices.get(newDevice.id()) - , newDevice); - } - - event = new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); - } - - if (!providerId.isAncillary()) { - boolean wasOnline = availableDevices.contains(newDevice.id()); - markOnline(newDevice.id(), newTimestamp); - if (!wasOnline) { - notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null)); - } - } - return event; - } - - @Override - public DeviceEvent markOffline(DeviceId deviceId) { - final Timestamp timestamp = deviceClockService.getTimestamp(deviceId); - final DeviceEvent event = markOfflineInternal(deviceId, timestamp); - if (event != null) { - log.debug("Notifying peers of a device offline topology event for deviceId: {} {}", - deviceId, timestamp); - notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp)); - } - return event; - } - - private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { - - Map<ProviderId, DeviceDescriptions> providerDescs - = getOrCreateDeviceDescriptionsMap(deviceId); - - // locking device - synchronized (providerDescs) { - - // accept off-line if given timestamp is newer than - // the latest Timestamp from Primary provider - DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs); - Timestamp lastTimestamp = primDescs.getLatestTimestamp(); - if (timestamp.compareTo(lastTimestamp) <= 0) { - // outdated event ignore - return null; - } - - offline.put(deviceId, timestamp); - - Device device = devices.get(deviceId); - if (device == null) { - return null; - } - boolean removed = availableDevices.remove(deviceId); - if (removed) { - return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); - } - return null; - } - } - - /** - * Marks the device as available if the given timestamp is not outdated, - * compared to the time the device has been marked offline. - * - * @param deviceId identifier of the device - * @param timestamp of the event triggering this change. - * @return true if availability change request was accepted and changed the state - */ - // Guarded by deviceDescs value (=Device lock) - private boolean markOnline(DeviceId deviceId, Timestamp timestamp) { - // accept on-line if given timestamp is newer than - // the latest offline request Timestamp - Timestamp offlineTimestamp = offline.get(deviceId); - if (offlineTimestamp == null || - offlineTimestamp.compareTo(timestamp) < 0) { - - offline.remove(deviceId); - return availableDevices.add(deviceId); - } - return false; - } - - @Override - public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, - DeviceId deviceId, - List<PortDescription> portDescriptions) { - - NodeId localNode = clusterService.getLocalNode().id(); - // TODO: It might be negligible, but this will have negative impact to topology discovery performance, - // since it will trigger distributed store read. - // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc. - // outside Device subsystem. so that we don't have to modify both Device and Link stores. - // If we don't care much about topology performance, then it might be OK. - NodeId deviceNode = mastershipService.getMasterFor(deviceId); - - // Process port update only if we're the master of the device, - // otherwise signal the actual master. - List<DeviceEvent> deviceEvents = null; - if (localNode.equals(deviceNode)) { - - final Timestamp newTimestamp; - try { - newTimestamp = deviceClockService.getTimestamp(deviceId); - } catch (IllegalStateException e) { - log.info("Timestamp was not available for device {}", deviceId); - log.debug(" discarding {}", portDescriptions); - // Failed to generate timestamp. - - // Possible situation: - // Device connected and became master for short period of time, - // but lost mastership before this instance had the chance to - // retrieve term information. - - // Information dropped here is expected to be recoverable by - // device probing after mastership change - - return Collections.emptyList(); - } - log.debug("timestamp for {} {}", deviceId, newTimestamp); - - final Timestamped<List<PortDescription>> timestampedInput - = new Timestamped<>(portDescriptions, newTimestamp); - final Timestamped<List<PortDescription>> merged; - - final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); - - synchronized (device) { - deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput); - final DeviceDescriptions descs = device.get(providerId); - List<PortDescription> mergedList = - FluentIterable.from(portDescriptions) - .transform(input -> - // lookup merged port description - descs.getPortDesc(input.portNumber()).value() - ).toList(); - merged = new Timestamped<>(mergedList, newTimestamp); - } - - if (!deviceEvents.isEmpty()) { - log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); - notifyPeers(new InternalPortEvent(providerId, deviceId, merged)); - } - - } else { - // FIXME Temporary hack for NPE (ONOS-1171). - // Proper fix is to implement forwarding to master on ConfigProvider - // redo ONOS-490 - if (deviceNode == null) { - // silently ignore - return Collections.emptyList(); - } - - PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions); - - //TODO check unicast return value - clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode); - /* error log: - log.warn("Failed to process injected ports of device id: {} " + - "(cluster messaging failed: {})", - deviceId, e); - */ - } - - return deviceEvents == null ? Collections.emptyList() : deviceEvents; - } - - private List<DeviceEvent> updatePortsInternal(ProviderId providerId, - DeviceId deviceId, - Timestamped<List<PortDescription>> portDescriptions) { - - Device device = devices.get(deviceId); - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); - - Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); - checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); - - List<DeviceEvent> events = new ArrayList<>(); - synchronized (descsMap) { - - if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) { - log.debug("Ignoring outdated events: {}", portDescriptions); - return Collections.emptyList(); - } - - DeviceDescriptions descs = descsMap.get(providerId); - // every provider must provide DeviceDescription. - checkArgument(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); - - Map<PortNumber, Port> ports = getPortMap(deviceId); - - final Timestamp newTimestamp = portDescriptions.timestamp(); - - // Add new ports - Set<PortNumber> processed = new HashSet<>(); - for (PortDescription portDescription : portDescriptions.value()) { - final PortNumber number = portDescription.portNumber(); - processed.add(number); - - final Port oldPort = ports.get(number); - final Port newPort; - - - final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); - if (existingPortDesc == null || - newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) { - // on new port or valid update - // update description - descs.putPortDesc(new Timestamped<>(portDescription, - portDescriptions.timestamp())); - newPort = composePort(device, number, descsMap); - } else { - // outdated event, ignored. - continue; - } - - events.add(oldPort == null ? - createPort(device, newPort, ports) : - updatePort(device, oldPort, newPort, ports)); - } - - events.addAll(pruneOldPorts(device, ports, processed)); - } - return FluentIterable.from(events).filter(notNull()).toList(); - } - - // Creates a new port based on the port description adds it to the map and - // Returns corresponding event. - // Guarded by deviceDescs value (=Device lock) - private DeviceEvent createPort(Device device, Port newPort, - Map<PortNumber, Port> ports) { - ports.put(newPort.number(), newPort); - return new DeviceEvent(PORT_ADDED, device, newPort); - } - - // Checks if the specified port requires update and if so, it replaces the - // existing entry in the map and returns corresponding event. - // Guarded by deviceDescs value (=Device lock) - private DeviceEvent updatePort(Device device, Port oldPort, - Port newPort, - Map<PortNumber, Port> ports) { - if (oldPort.isEnabled() != newPort.isEnabled() || - oldPort.type() != newPort.type() || - oldPort.portSpeed() != newPort.portSpeed() || - !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) { - ports.put(oldPort.number(), newPort); - return new DeviceEvent(PORT_UPDATED, device, newPort); - } - return null; - } - - // Prunes the specified list of ports based on which ports are in the - // processed list and returns list of corresponding events. - // Guarded by deviceDescs value (=Device lock) - private List<DeviceEvent> pruneOldPorts(Device device, - Map<PortNumber, Port> ports, - Set<PortNumber> processed) { - List<DeviceEvent> events = new ArrayList<>(); - Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<PortNumber, Port> e = iterator.next(); - PortNumber portNumber = e.getKey(); - if (!processed.contains(portNumber)) { - events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue())); - iterator.remove(); - } - } - return events; - } - - // Gets the map of ports for the specified device; if one does not already - // exist, it creates and registers a new one. - private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) { - return createIfAbsentUnchecked(devicePorts, deviceId, - NewConcurrentHashMap.<PortNumber, Port>ifNeeded()); - } - - private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap( - DeviceId deviceId) { - Map<ProviderId, DeviceDescriptions> r; - r = deviceDescs.get(deviceId); - if (r == null) { - r = new HashMap<>(); - final Map<ProviderId, DeviceDescriptions> concurrentlyAdded; - concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r); - if (concurrentlyAdded != null) { - r = concurrentlyAdded; - } - } - return r; - } - - // Guarded by deviceDescs value (=Device lock) - private DeviceDescriptions getOrCreateProviderDeviceDescriptions( - Map<ProviderId, DeviceDescriptions> device, - ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) { - synchronized (device) { - DeviceDescriptions r = device.get(providerId); - if (r == null) { - r = new DeviceDescriptions(deltaDesc); - device.put(providerId, r); - } - return r; - } - } - - @Override - public synchronized DeviceEvent updatePortStatus(ProviderId providerId, - DeviceId deviceId, - PortDescription portDescription) { - final Timestamp newTimestamp; - try { - newTimestamp = deviceClockService.getTimestamp(deviceId); - } catch (IllegalStateException e) { - log.info("Timestamp was not available for device {}", deviceId); - log.debug(" discarding {}", portDescription); - // Failed to generate timestamp. Ignoring. - // See updatePorts comment - return null; - } - final Timestamped<PortDescription> deltaDesc - = new Timestamped<>(portDescription, newTimestamp); - final DeviceEvent event; - final Timestamped<PortDescription> mergedDesc; - final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); - synchronized (device) { - event = updatePortStatusInternal(providerId, deviceId, deltaDesc); - mergedDesc = device.get(providerId) - .getPortDesc(portDescription.portNumber()); - } - if (event != null) { - log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); - notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc)); - } - return event; - } - - private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId, - Timestamped<PortDescription> deltaDesc) { - Device device = devices.get(deviceId); - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); - - Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); - checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); - - synchronized (descsMap) { - - if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) { - log.debug("Ignoring outdated event: {}", deltaDesc); - return null; - } - - DeviceDescriptions descs = descsMap.get(providerId); - // assuming all providers must to give DeviceDescription - verify(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); - - ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); - final PortNumber number = deltaDesc.value().portNumber(); - final Port oldPort = ports.get(number); - final Port newPort; - - final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); - if (existingPortDesc == null || - deltaDesc.isNewer(existingPortDesc)) { - // on new port or valid update - // update description - descs.putPortDesc(deltaDesc); - newPort = composePort(device, number, descsMap); - } else { - // same or outdated event, ignored. - log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc); - return null; - } - - if (oldPort == null) { - return createPort(device, newPort, ports); - } else { - return updatePort(device, oldPort, newPort, ports); - } - } - } - - @Override - public List<Port> getPorts(DeviceId deviceId) { - Map<PortNumber, Port> ports = devicePorts.get(deviceId); - if (ports == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(ports.values()); - } - - @Override - public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId, - Collection<PortStatistics> newStatsCollection) { - - Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId); - Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap(); - Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap(); - - if (prvStatsMap != null) { - for (PortStatistics newStats : newStatsCollection) { - PortNumber port = PortNumber.portNumber(newStats.port()); - PortStatistics prvStats = prvStatsMap.get(port); - DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); - PortStatistics deltaStats = builder.build(); - if (prvStats != null) { - deltaStats = calcDeltaStats(deviceId, prvStats, newStats); - } - deltaStatsMap.put(port, deltaStats); - newStatsMap.put(port, newStats); - } - } else { - for (PortStatistics newStats : newStatsCollection) { - PortNumber port = PortNumber.portNumber(newStats.port()); - newStatsMap.put(port, newStats); - } - } - devicePortDeltaStats.put(deviceId, deltaStatsMap); - devicePortStats.put(deviceId, newStatsMap); - // DeviceEvent returns null because of InternalPortStatsListener usage - return null; - } - - /** - * Calculate delta statistics by subtracting previous from new statistics. - * - * @param deviceId device identifier - * @param prvStats previous port statistics - * @param newStats new port statistics - * @return PortStatistics - */ - public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) { - // calculate time difference - long deltaStatsSec, deltaStatsNano; - if (newStats.durationNano() < prvStats.durationNano()) { - deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1); - deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L; - } else { - deltaStatsNano = newStats.durationNano() - prvStats.durationNano(); - deltaStatsSec = newStats.durationSec() - prvStats.durationSec(); - } - DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); - DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId) - .setPort(newStats.port()) - .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived()) - .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent()) - .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived()) - .setBytesSent(newStats.bytesSent() - prvStats.bytesSent()) - .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped()) - .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped()) - .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors()) - .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors()) - .setDurationSec(deltaStatsSec) - .setDurationNano(deltaStatsNano) - .build(); - return deltaStats; - } - - @Override - public List<PortStatistics> getPortStatistics(DeviceId deviceId) { - Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId); - if (portStats == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(portStats.values()); - } - - @Override - public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) { - Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId); - if (portStats == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(portStats.values()); - } - - @Override - public Port getPort(DeviceId deviceId, PortNumber portNumber) { - Map<PortNumber, Port> ports = devicePorts.get(deviceId); - return ports == null ? null : ports.get(portNumber); - } - - @Override - public boolean isAvailable(DeviceId deviceId) { - return availableDevices.contains(deviceId); - } - - @Override - public synchronized DeviceEvent removeDevice(DeviceId deviceId) { - final NodeId myId = clusterService.getLocalNode().id(); - NodeId master = mastershipService.getMasterFor(deviceId); - - // if there exist a master, forward - // if there is no master, try to become one and process - - boolean relinquishAtEnd = false; - if (master == null) { - final MastershipRole myRole = mastershipService.getLocalRole(deviceId); - if (myRole != MastershipRole.NONE) { - relinquishAtEnd = true; - } - log.debug("Temporarily requesting role for {} to remove", deviceId); - mastershipService.requestRoleFor(deviceId); - MastershipTerm term = termService.getMastershipTerm(deviceId); - if (term != null && myId.equals(term.master())) { - master = myId; - } - } - - if (!myId.equals(master)) { - log.debug("{} has control of {}, forwarding remove request", - master, deviceId); - - // TODO check unicast return value - clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master); - /* error log: - log.error("Failed to forward {} remove request to {}", deviceId, master, e); - */ - - // event will be triggered after master processes it. - return null; - } - - // I have control.. - - Timestamp timestamp = deviceClockService.getTimestamp(deviceId); - DeviceEvent event = removeDeviceInternal(deviceId, timestamp); - if (event != null) { - log.debug("Notifying peers of a device removed topology event for deviceId: {}", - deviceId); - notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp)); - } - if (relinquishAtEnd) { - log.debug("Relinquishing temporary role acquired for {}", deviceId); - mastershipService.relinquishMastership(deviceId); - } - return event; - } - - private DeviceEvent removeDeviceInternal(DeviceId deviceId, - Timestamp timestamp) { - - Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId); - synchronized (descs) { - // accept removal request if given timestamp is newer than - // the latest Timestamp from Primary provider - DeviceDescriptions primDescs = getPrimaryDescriptions(descs); - if (primDescs == null) { - return null; - } - - Timestamp lastTimestamp = primDescs.getLatestTimestamp(); - if (timestamp.compareTo(lastTimestamp) <= 0) { - // outdated event ignore - return null; - } - removalRequest.put(deviceId, timestamp); - - Device device = devices.remove(deviceId); - // should DEVICE_REMOVED carry removed ports? - Map<PortNumber, Port> ports = devicePorts.get(deviceId); - if (ports != null) { - ports.clear(); - } - markOfflineInternal(deviceId, timestamp); - descs.clear(); - return device == null ? null : - new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null); - } - } - - /** - * Checks if given timestamp is superseded by removal request - * with more recent timestamp. - * - * @param deviceId identifier of a device - * @param timestampToCheck timestamp of an event to check - * @return true if device is already removed - */ - private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) { - Timestamp removalTimestamp = removalRequest.get(deviceId); - if (removalTimestamp != null && - removalTimestamp.compareTo(timestampToCheck) >= 0) { - // removalRequest is more recent - return true; - } - return false; - } - - /** - * Returns a Device, merging description given from multiple Providers. - * - * @param deviceId device identifier - * @param providerDescs Collection of Descriptions from multiple providers - * @return Device instance - */ - private Device composeDevice(DeviceId deviceId, - Map<ProviderId, DeviceDescriptions> providerDescs) { - - checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied"); - - ProviderId primary = pickPrimaryPid(providerDescs); - - DeviceDescriptions desc = providerDescs.get(primary); - - final DeviceDescription base = desc.getDeviceDesc().value(); - Type type = base.type(); - String manufacturer = base.manufacturer(); - String hwVersion = base.hwVersion(); - String swVersion = base.swVersion(); - String serialNumber = base.serialNumber(); - ChassisId chassisId = base.chassisId(); - DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - annotations = merge(annotations, base.annotations()); - - for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { - if (e.getKey().equals(primary)) { - continue; - } - // Note: should keep track of Description timestamp in the future - // and only merge conflicting keys when timestamp is newer. - // Currently assuming there will never be a key conflict between - // providers - - // annotation merging. not so efficient, should revisit later - annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations()); - } - - return new DefaultDevice(primary, deviceId, type, manufacturer, - hwVersion, swVersion, serialNumber, - chassisId, annotations); - } - - private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled, - PortDescription description, Annotations annotations) { - switch (description.type()) { - case OMS: - OmsPortDescription omsDesc = (OmsPortDescription) description; - return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(), - omsDesc.maxFrequency(), omsDesc.grid(), annotations); - case OCH: - OchPortDescription ochDesc = (OchPortDescription) description; - return new OchPort(device, number, isEnabled, ochDesc.signalType(), - ochDesc.isTunable(), ochDesc.lambda(), annotations); - case ODUCLT: - OduCltPortDescription oduDesc = (OduCltPortDescription) description; - return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations); - default: - return new DefaultPort(device, number, isEnabled, description.type(), - description.portSpeed(), annotations); - } - } - - /** - * Returns a Port, merging description given from multiple Providers. - * - * @param device device the port is on - * @param number port number - * @param descsMap Collection of Descriptions from multiple providers - * @return Port instance - */ - private Port composePort(Device device, PortNumber number, - Map<ProviderId, DeviceDescriptions> descsMap) { - - ProviderId primary = pickPrimaryPid(descsMap); - DeviceDescriptions primDescs = descsMap.get(primary); - // if no primary, assume not enabled - boolean isEnabled = false; - DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - Timestamp newest = null; - final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number); - if (portDesc != null) { - isEnabled = portDesc.value().isEnabled(); - annotations = merge(annotations, portDesc.value().annotations()); - newest = portDesc.timestamp(); - } - Port updated = null; - for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) { - if (e.getKey().equals(primary)) { - continue; - } - // Note: should keep track of Description timestamp in the future - // and only merge conflicting keys when timestamp is newer. - // Currently assuming there will never be a key conflict between - // providers - - // annotation merging. not so efficient, should revisit later - final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number); - if (otherPortDesc != null) { - if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) { - continue; - } - annotations = merge(annotations, otherPortDesc.value().annotations()); - PortDescription other = otherPortDesc.value(); - updated = buildTypedPort(device, number, isEnabled, other, annotations); - newest = otherPortDesc.timestamp(); - } - } - if (portDesc == null) { - return updated == null ? new DefaultPort(device, number, false, annotations) : updated; - } - PortDescription current = portDesc.value(); - return updated == null - ? buildTypedPort(device, number, isEnabled, current, annotations) - : updated; - } - - /** - * @return primary ProviderID, or randomly chosen one if none exists - */ - private ProviderId pickPrimaryPid( - Map<ProviderId, DeviceDescriptions> providerDescs) { - ProviderId fallBackPrimary = null; - for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { - if (!e.getKey().isAncillary()) { - return e.getKey(); - } else if (fallBackPrimary == null) { - // pick randomly as a fallback in case there is no primary - fallBackPrimary = e.getKey(); - } - } - return fallBackPrimary; - } - - private DeviceDescriptions getPrimaryDescriptions( - Map<ProviderId, DeviceDescriptions> providerDescs) { - ProviderId pid = pickPrimaryPid(providerDescs); - return providerDescs.get(pid); - } - - private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException { - clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient); - } - - private void broadcastMessage(MessageSubject subject, Object event) { - clusterCommunicator.broadcast(event, subject, SERIALIZER::encode); - } - - private void notifyPeers(InternalDeviceEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event); - } - - private void notifyPeers(InternalDeviceOfflineEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event); - } - - private void notifyPeers(InternalDeviceRemovedEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event); - } - - private void notifyPeers(InternalPortEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event); - } - - private void notifyPeers(InternalPortStatusEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event); - } - - private void notifyPeer(NodeId recipient, InternalDeviceEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private void notifyPeer(NodeId recipient, InternalPortEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private DeviceAntiEntropyAdvertisement createAdvertisement() { - final NodeId self = clusterService.getLocalNode().id(); - - final int numDevices = deviceDescs.size(); - Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices); - final int portsPerDevice = 8; // random factor to minimize reallocation - Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice); - Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices); - - deviceDescs.forEach((deviceId, devDescs) -> { - - // for each Device... - synchronized (devDescs) { - - // send device offline timestamp - Timestamp lOffline = this.offline.get(deviceId); - if (lOffline != null) { - adOffline.put(deviceId, lOffline); - } - - for (Entry<ProviderId, DeviceDescriptions> - prov : devDescs.entrySet()) { - - // for each Provider Descriptions... - final ProviderId provId = prov.getKey(); - final DeviceDescriptions descs = prov.getValue(); - - adDevices.put(new DeviceFragmentId(deviceId, provId), - descs.getDeviceDesc().timestamp()); - - for (Entry<PortNumber, Timestamped<PortDescription>> - portDesc : descs.getPortDescs().entrySet()) { - - final PortNumber number = portDesc.getKey(); - adPorts.put(new PortFragmentId(deviceId, provId, number), - portDesc.getValue().timestamp()); - } - } - } - }); - - return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline); - } - - /** - * Responds to anti-entropy advertisement message. - * <p/> - * Notify sender about out-dated information using regular replication message. - * Send back advertisement to sender if not in sync. - * - * @param advertisement to respond to - */ - private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) { - - final NodeId sender = advertisement.sender(); - - Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints()); - Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports()); - Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline()); - - // Fragments to request - Collection<DeviceFragmentId> reqDevices = new ArrayList<>(); - Collection<PortFragmentId> reqPorts = new ArrayList<>(); - - for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) { - final DeviceId deviceId = de.getKey(); - final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue(); - - synchronized (lDevice) { - // latestTimestamp across provider - // Note: can be null initially - Timestamp localLatest = offline.get(deviceId); - - // handle device Ads - for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) { - final ProviderId provId = prov.getKey(); - final DeviceDescriptions lDeviceDescs = prov.getValue(); - - final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId); - - - Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc(); - Timestamp advDevTimestamp = devAds.get(devFragId); - - if (advDevTimestamp == null || lProvDevice.isNewerThan( - advDevTimestamp)) { - // remote does not have it or outdated, suggest - notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice)); - } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) { - // local is outdated, request - reqDevices.add(devFragId); - } - - // handle port Ads - for (Entry<PortNumber, Timestamped<PortDescription>> - pe : lDeviceDescs.getPortDescs().entrySet()) { - - final PortNumber num = pe.getKey(); - final Timestamped<PortDescription> lPort = pe.getValue(); - - final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num); - - Timestamp advPortTimestamp = portAds.get(portFragId); - if (advPortTimestamp == null || lPort.isNewerThan( - advPortTimestamp)) { - // remote does not have it or outdated, suggest - notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort)); - } else if (!lPort.timestamp().equals(advPortTimestamp)) { - // local is outdated, request - log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp); - reqPorts.add(portFragId); - } - - // remove port Ad already processed - portAds.remove(portFragId); - } // end local port loop - - // remove device Ad already processed - devAds.remove(devFragId); - - // find latest and update - final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp(); - if (localLatest == null || - providerLatest.compareTo(localLatest) > 0) { - localLatest = providerLatest; - } - } // end local provider loop - - // checking if remote timestamp is more recent. - Timestamp rOffline = offlineAds.get(deviceId); - if (rOffline != null && - rOffline.compareTo(localLatest) > 0) { - // remote offline timestamp suggests that the - // device is off-line - markOfflineInternal(deviceId, rOffline); - } - - Timestamp lOffline = offline.get(deviceId); - if (lOffline != null && rOffline == null) { - // locally offline, but remote is online, suggest offline - notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline)); - } - - // remove device offline Ad already processed - offlineAds.remove(deviceId); - } // end local device loop - } // device lock - - // If there is any Ads left, request them - log.trace("Ads left {}, {}", devAds, portAds); - reqDevices.addAll(devAds.keySet()); - reqPorts.addAll(portAds.keySet()); - - if (reqDevices.isEmpty() && reqPorts.isEmpty()) { - log.trace("Nothing to request to remote peer {}", sender); - return; - } - - log.debug("Need to sync {} {}", reqDevices, reqPorts); - - // 2-way Anti-Entropy for now - try { - unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement()); - } catch (IOException e) { - log.error("Failed to send response advertisement to " + sender, e); - } - -// Sketch of 3-way Anti-Entropy -// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts); -// ClusterMessage message = new ClusterMessage( -// clusterService.getLocalNode().id(), -// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST, -// SERIALIZER.encode(request)); -// -// try { -// clusterCommunicator.unicast(message, advertisement.sender()); -// } catch (IOException e) { -// log.error("Failed to send advertisement reply to " -// + advertisement.sender(), e); -// } - } - - private void notifyDelegateIfNotNull(DeviceEvent event) { - if (event != null) { - notifyDelegate(event); - } - } - - private final class SendAdvertisementTask implements Runnable { - - @Override - public void run() { - if (Thread.currentThread().isInterrupted()) { - log.debug("Interrupted, quitting"); - return; - } - - try { - final NodeId self = clusterService.getLocalNode().id(); - Set<ControllerNode> nodes = clusterService.getNodes(); - - ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes) - .transform(toNodeId()) - .toList(); - - if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) { - log.trace("No other peers in the cluster."); - return; - } - - NodeId peer; - do { - int idx = RandomUtils.nextInt(0, nodeIds.size()); - peer = nodeIds.get(idx); - } while (peer.equals(self)); - - DeviceAntiEntropyAdvertisement ad = createAdvertisement(); - - if (Thread.currentThread().isInterrupted()) { - log.debug("Interrupted, quitting"); - return; - } - - try { - unicastMessage(peer, DEVICE_ADVERTISE, ad); - } catch (IOException e) { - log.debug("Failed to send anti-entropy advertisement to {}", peer); - return; - } - } catch (Exception e) { - // catch all Exception to avoid Scheduled task being suppressed. - log.error("Exception thrown while sending advertisement", e); - } - } - } - - private final class InternalDeviceEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received device update event from peer: {}", message.sender()); - InternalDeviceEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - Timestamped<DeviceDescription> deviceDescription = event.deviceDescription(); - - try { - notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription)); - } catch (Exception e) { - log.warn("Exception thrown handling device update", e); - } - } - } - - private final class InternalDeviceOfflineEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received device offline event from peer: {}", message.sender()); - InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload()); - - DeviceId deviceId = event.deviceId(); - Timestamp timestamp = event.timestamp(); - - try { - notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp)); - } catch (Exception e) { - log.warn("Exception thrown handling device offline", e); - } - } - } - - private final class InternalRemoveRequestListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received device remove request from peer: {}", message.sender()); - DeviceId did = SERIALIZER.decode(message.payload()); - - try { - removeDevice(did); - } catch (Exception e) { - log.warn("Exception thrown handling device remove", e); - } - } - } - - private final class InternalDeviceRemovedEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received device removed event from peer: {}", message.sender()); - InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload()); - - DeviceId deviceId = event.deviceId(); - Timestamp timestamp = event.timestamp(); - - try { - notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp)); - } catch (Exception e) { - log.warn("Exception thrown handling device removed", e); - } - } - } - - private final class InternalPortEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - - log.debug("Received port update event from peer: {}", message.sender()); - InternalPortEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions(); - - if (getDevice(deviceId) == null) { - log.debug("{} not found on this node yet, ignoring.", deviceId); - // Note: dropped information will be recovered by anti-entropy - return; - } - - try { - notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions)); - } catch (Exception e) { - log.warn("Exception thrown handling port update", e); - } - } - } - - private final class InternalPortStatusEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - - log.debug("Received port status update event from peer: {}", message.sender()); - InternalPortStatusEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - Timestamped<PortDescription> portDescription = event.portDescription(); - - if (getDevice(deviceId) == null) { - log.debug("{} not found on this node yet, ignoring.", deviceId); - // Note: dropped information will be recovered by anti-entropy - return; - } - - try { - notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription)); - } catch (Exception e) { - log.warn("Exception thrown handling port update", e); - } - } - } - - private final class InternalDeviceAdvertisementListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender()); - DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload()); - try { - handleAdvertisement(advertisement); - } catch (Exception e) { - log.warn("Exception thrown handling Device advertisements.", e); - } - } - } - - private final class DeviceInjectedEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received injected device event from peer: {}", message.sender()); - DeviceInjectedEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - DeviceDescription deviceDescription = event.deviceDescription(); - if (!deviceClockService.isTimestampAvailable(deviceId)) { - // workaround for ONOS-1208 - log.warn("Not ready to accept update. Dropping {}", deviceDescription); - return; - } - - try { - createOrUpdateDevice(providerId, deviceId, deviceDescription); - } catch (Exception e) { - log.warn("Exception thrown handling device injected event.", e); - } - } - } - - private final class PortInjectedEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received injected port event from peer: {}", message.sender()); - PortInjectedEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - List<PortDescription> portDescriptions = event.portDescriptions(); - if (!deviceClockService.isTimestampAvailable(deviceId)) { - // workaround for ONOS-1208 - log.warn("Not ready to accept update. Dropping {}", portDescriptions); - return; - } - - try { - updatePorts(providerId, deviceId, portDescriptions); - } catch (Exception e) { - log.warn("Exception thrown handling port injected event.", e); - } - } - } - - private class InternalPortStatsListener - implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> { - @Override - public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) { - if (event.type() == PUT) { - Device device = devices.get(event.key()); - if (device != null) { - delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device)); - } - } - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java deleted file mode 100644 index 554faf91..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import org.onosproject.store.cluster.messaging.MessageSubject; - -/** - * MessageSubjects used by GossipDeviceStore peer-peer communication. - */ -public final class GossipDeviceStoreMessageSubjects { - - private GossipDeviceStoreMessageSubjects() {} - - public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update"); - public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline"); - public static final MessageSubject DEVICE_REMOVE_REQ = new MessageSubject("peer-device-remove-request"); - public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed"); - public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update"); - public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update"); - - public static final MessageSubject DEVICE_ADVERTISE = new MessageSubject("peer-device-advertisements"); - // to be used with 3-way anti-entropy process - public static final MessageSubject DEVICE_REQUEST = new MessageSubject("peer-device-request"); - - // Network elements injected (not discovered) by ConfigProvider - public static final MessageSubject DEVICE_INJECTED = new MessageSubject("peer-device-injected"); - public static final MessageSubject PORT_INJECTED = new MessageSubject("peer-port-injected"); -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceEvent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceEvent.java deleted file mode 100644 index 6916a3ed..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceEvent.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.DeviceDescription; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.impl.Timestamped; - -import com.google.common.base.MoreObjects; - -/** - * Information published by GossipDeviceStore to notify peers of a device - * change event. - */ -public class InternalDeviceEvent { - - private final ProviderId providerId; - private final DeviceId deviceId; - private final Timestamped<DeviceDescription> deviceDescription; - - protected InternalDeviceEvent( - ProviderId providerId, - DeviceId deviceId, - Timestamped<DeviceDescription> deviceDescription) { - this.providerId = providerId; - this.deviceId = deviceId; - this.deviceDescription = deviceDescription; - } - - public DeviceId deviceId() { - return deviceId; - } - - public ProviderId providerId() { - return providerId; - } - - public Timestamped<DeviceDescription> deviceDescription() { - return deviceDescription; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .add("deviceDescription", deviceDescription) - .toString(); - } - - // for serializer - protected InternalDeviceEvent() { - this.providerId = null; - this.deviceId = null; - this.deviceDescription = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceEventSerializer.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceEventSerializer.java deleted file mode 100644 index d5fbde7e..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceEventSerializer.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.DeviceDescription; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.impl.Timestamped; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -/** - * Kryo Serializer for {@link InternalDeviceEvent}. - */ -public class InternalDeviceEventSerializer extends Serializer<InternalDeviceEvent> { - - /** - * Creates a serializer for {@link InternalDeviceEvent}. - */ - public InternalDeviceEventSerializer() { - // does not accept null - super(false); - } - - @Override - public void write(Kryo kryo, Output output, InternalDeviceEvent event) { - kryo.writeClassAndObject(output, event.providerId()); - kryo.writeClassAndObject(output, event.deviceId()); - kryo.writeClassAndObject(output, event.deviceDescription()); - } - - @Override - public InternalDeviceEvent read(Kryo kryo, Input input, - Class<InternalDeviceEvent> type) { - ProviderId providerId = (ProviderId) kryo.readClassAndObject(input); - DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input); - - @SuppressWarnings("unchecked") - Timestamped<DeviceDescription> deviceDescription - = (Timestamped<DeviceDescription>) kryo.readClassAndObject(input); - - return new InternalDeviceEvent(providerId, deviceId, deviceDescription); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceOfflineEvent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceOfflineEvent.java deleted file mode 100644 index 0546c139..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceOfflineEvent.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import org.onosproject.net.DeviceId; -import org.onosproject.store.Timestamp; - -import com.google.common.base.MoreObjects; - -/** - * Information published by GossipDeviceStore to notify peers of a device - * going offline. - */ -public class InternalDeviceOfflineEvent { - - private final DeviceId deviceId; - private final Timestamp timestamp; - - /** - * Creates a InternalDeviceOfflineEvent. - * @param deviceId identifier of device going offline. - * @param timestamp timestamp of when the device went offline. - */ - public InternalDeviceOfflineEvent(DeviceId deviceId, Timestamp timestamp) { - this.deviceId = deviceId; - this.timestamp = timestamp; - } - - public DeviceId deviceId() { - return deviceId; - } - - public Timestamp timestamp() { - return timestamp; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("deviceId", deviceId) - .add("timestamp", timestamp) - .toString(); - } - - // for serializer - @SuppressWarnings("unused") - private InternalDeviceOfflineEvent() { - deviceId = null; - timestamp = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceOfflineEventSerializer.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceOfflineEventSerializer.java deleted file mode 100644 index 7f3c7bcf..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceOfflineEventSerializer.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import org.onosproject.net.DeviceId; -import org.onosproject.store.Timestamp; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -/** - * Kryo Serializer for {@link InternalDeviceOfflineEvent}. - */ -public class InternalDeviceOfflineEventSerializer extends Serializer<InternalDeviceOfflineEvent> { - - /** - * Creates a serializer for {@link InternalDeviceOfflineEvent}. - */ - public InternalDeviceOfflineEventSerializer() { - // does not accept null - super(false); - } - - @Override - public void write(Kryo kryo, Output output, InternalDeviceOfflineEvent event) { - kryo.writeClassAndObject(output, event.deviceId()); - kryo.writeClassAndObject(output, event.timestamp()); - } - - @Override - public InternalDeviceOfflineEvent read(Kryo kryo, Input input, - Class<InternalDeviceOfflineEvent> type) { - DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input); - Timestamp timestamp = (Timestamp) kryo.readClassAndObject(input); - - return new InternalDeviceOfflineEvent(deviceId, timestamp); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceRemovedEvent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceRemovedEvent.java deleted file mode 100644 index e9f4f06a..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalDeviceRemovedEvent.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import org.onosproject.net.DeviceId; -import org.onosproject.store.Timestamp; - -import com.google.common.base.MoreObjects; - -/** - * Information published by GossipDeviceStore to notify peers of a device - * being administratively removed. - */ -public class InternalDeviceRemovedEvent { - - private final DeviceId deviceId; - private final Timestamp timestamp; - - /** - * Creates a InternalDeviceRemovedEvent. - * @param deviceId identifier of the removed device. - * @param timestamp timestamp of when the device was administratively removed. - */ - public InternalDeviceRemovedEvent(DeviceId deviceId, Timestamp timestamp) { - this.deviceId = deviceId; - this.timestamp = timestamp; - } - - public DeviceId deviceId() { - return deviceId; - } - - public Timestamp timestamp() { - return timestamp; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("deviceId", deviceId) - .add("timestamp", timestamp) - .toString(); - } - - // for serializer - @SuppressWarnings("unused") - private InternalDeviceRemovedEvent() { - deviceId = null; - timestamp = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortEvent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortEvent.java deleted file mode 100644 index f92fb115..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortEvent.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import java.util.List; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.impl.Timestamped; - -import com.google.common.base.MoreObjects; - -/** - * Information published by GossipDeviceStore to notify peers of a port - * change event. - */ -public class InternalPortEvent { - - private final ProviderId providerId; - private final DeviceId deviceId; - private final Timestamped<List<PortDescription>> portDescriptions; - - protected InternalPortEvent( - ProviderId providerId, - DeviceId deviceId, - Timestamped<List<PortDescription>> portDescriptions) { - this.providerId = providerId; - this.deviceId = deviceId; - this.portDescriptions = portDescriptions; - } - - public DeviceId deviceId() { - return deviceId; - } - - public ProviderId providerId() { - return providerId; - } - - public Timestamped<List<PortDescription>> portDescriptions() { - return portDescriptions; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .add("portDescriptions", portDescriptions) - .toString(); - } - - // for serializer - protected InternalPortEvent() { - this.providerId = null; - this.deviceId = null; - this.portDescriptions = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortEventSerializer.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortEventSerializer.java deleted file mode 100644 index 0acd703f..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortEventSerializer.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import java.util.List; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.impl.Timestamped; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -/** - * Kryo Serializer for {@link InternalPortEvent}. - */ -public class InternalPortEventSerializer extends Serializer<InternalPortEvent> { - - /** - * Creates a serializer for {@link InternalPortEvent}. - */ - public InternalPortEventSerializer() { - // does not accept null - super(false); - } - - @Override - public void write(Kryo kryo, Output output, InternalPortEvent event) { - kryo.writeClassAndObject(output, event.providerId()); - kryo.writeClassAndObject(output, event.deviceId()); - kryo.writeClassAndObject(output, event.portDescriptions()); - } - - @Override - public InternalPortEvent read(Kryo kryo, Input input, - Class<InternalPortEvent> type) { - ProviderId providerId = (ProviderId) kryo.readClassAndObject(input); - DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input); - - @SuppressWarnings("unchecked") - Timestamped<List<PortDescription>> portDescriptions - = (Timestamped<List<PortDescription>>) kryo.readClassAndObject(input); - - return new InternalPortEvent(providerId, deviceId, portDescriptions); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortStatusEvent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortStatusEvent.java deleted file mode 100644 index f1781693..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortStatusEvent.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.impl.Timestamped; - -import com.google.common.base.MoreObjects; - -/** - * Information published by GossipDeviceStore to notify peers of a port - * status change event. - */ -public class InternalPortStatusEvent { - - private final ProviderId providerId; - private final DeviceId deviceId; - private final Timestamped<PortDescription> portDescription; - - protected InternalPortStatusEvent( - ProviderId providerId, - DeviceId deviceId, - Timestamped<PortDescription> portDescription) { - this.providerId = providerId; - this.deviceId = deviceId; - this.portDescription = portDescription; - } - - public DeviceId deviceId() { - return deviceId; - } - - public ProviderId providerId() { - return providerId; - } - - public Timestamped<PortDescription> portDescription() { - return portDescription; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .add("portDescription", portDescription) - .toString(); - } - - // for serializer - protected InternalPortStatusEvent() { - this.providerId = null; - this.deviceId = null; - this.portDescription = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortStatusEventSerializer.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortStatusEventSerializer.java deleted file mode 100644 index 32ee3915..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/InternalPortStatusEventSerializer.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.impl.Timestamped; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -/** - * Kryo Serializer for {@link InternalPortStatusEvent}. - */ -public class InternalPortStatusEventSerializer extends Serializer<InternalPortStatusEvent> { - - /** - * Creates a serializer for {@link InternalPortStatusEvent}. - */ - public InternalPortStatusEventSerializer() { - // does not accept null - super(false); - } - - @Override - public void write(Kryo kryo, Output output, InternalPortStatusEvent event) { - kryo.writeClassAndObject(output, event.providerId()); - kryo.writeClassAndObject(output, event.deviceId()); - kryo.writeClassAndObject(output, event.portDescription()); - } - - @Override - public InternalPortStatusEvent read(Kryo kryo, Input input, - Class<InternalPortStatusEvent> type) { - ProviderId providerId = (ProviderId) kryo.readClassAndObject(input); - DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input); - @SuppressWarnings("unchecked") - Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input); - - return new InternalPortStatusEvent(providerId, deviceId, portDescription); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortFragmentId.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortFragmentId.java deleted file mode 100644 index 1ff05198..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortFragmentId.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import java.util.Objects; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.PortNumber; -import org.onosproject.net.provider.ProviderId; - -import com.google.common.base.MoreObjects; - -/** - * Identifier for PortDescription from a Provider. - */ -public final class PortFragmentId { - public final ProviderId providerId; - public final DeviceId deviceId; - public final PortNumber portNumber; - - public PortFragmentId(DeviceId deviceId, ProviderId providerId, - PortNumber portNumber) { - this.providerId = providerId; - this.deviceId = deviceId; - this.portNumber = portNumber; - } - - @Override - public int hashCode() { - return Objects.hash(providerId, deviceId, portNumber); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof PortFragmentId)) { - return false; - } - PortFragmentId that = (PortFragmentId) obj; - return Objects.equals(this.deviceId, that.deviceId) && - Objects.equals(this.portNumber, that.portNumber) && - Objects.equals(this.providerId, that.providerId); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .add("portNumber", portNumber) - .toString(); - } - - // for serializer - @SuppressWarnings("unused") - private PortFragmentId() { - this.providerId = null; - this.deviceId = null; - this.portNumber = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java deleted file mode 100644 index 971f53db..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import com.google.common.base.MoreObjects; -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.provider.ProviderId; - -import java.util.List; - -public class PortInjectedEvent { - - private ProviderId providerId; - private DeviceId deviceId; - private List<PortDescription> portDescriptions; - - protected PortInjectedEvent(ProviderId providerId, DeviceId deviceId, List<PortDescription> portDescriptions) { - this.providerId = providerId; - this.deviceId = deviceId; - this.portDescriptions = portDescriptions; - } - - public DeviceId deviceId() { - return deviceId; - } - - public ProviderId providerId() { - return providerId; - } - - public List<PortDescription> portDescriptions() { - return portDescriptions; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .add("portDescriptions", portDescriptions) - .toString(); - } - - // for serializer - protected PortInjectedEvent() { - this.providerId = null; - this.deviceId = null; - this.portDescriptions = null; - } - -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortKey.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortKey.java deleted file mode 100644 index 62b09952..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortKey.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import java.util.Objects; - -import org.onosproject.net.DeviceId; -import org.onosproject.net.PortNumber; -import org.onosproject.net.provider.ProviderId; - -import com.google.common.base.MoreObjects; - -/** - * Key for PortDescriptions in ECDeviceStore. - */ -public class PortKey { - private final ProviderId providerId; - private final DeviceId deviceId; - private final PortNumber portNumber; - - public PortKey(ProviderId providerId, DeviceId deviceId, PortNumber portNumber) { - this.providerId = providerId; - this.deviceId = deviceId; - this.portNumber = portNumber; - } - - public ProviderId providerId() { - return providerId; - } - - public DeviceId deviceId() { - return deviceId; - } - - public PortNumber portNumber() { - return portNumber; - } - - @Override - public int hashCode() { - return Objects.hash(providerId, deviceId, portNumber); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof PortKey)) { - return false; - } - PortKey that = (PortKey) obj; - return Objects.equals(this.deviceId, that.deviceId) && - Objects.equals(this.providerId, that.providerId) && - Objects.equals(this.portNumber, that.portNumber); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("providerId", providerId) - .add("deviceId", deviceId) - .add("portNumber", portNumber) - .toString(); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/package-info.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/package-info.java deleted file mode 100644 index 29df62ec..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of distributed device store using p2p synchronization protocol. - */ -package org.onosproject.store.device.impl; |