diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java | 1672 |
1 files changed, 0 insertions, 1672 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java deleted file mode 100644 index a9a9098e..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java +++ /dev/null @@ -1,1672 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.apache.commons.lang3.RandomUtils; -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.packet.ChassisId; -import org.onlab.util.KryoNamespace; -import org.onlab.util.NewConcurrentHashMap; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.NodeId; -import org.onosproject.mastership.MastershipService; -import org.onosproject.mastership.MastershipTerm; -import org.onosproject.mastership.MastershipTermService; -import org.onosproject.net.Annotations; -import org.onosproject.net.AnnotationsUtil; -import org.onosproject.net.DefaultAnnotations; -import org.onosproject.net.DefaultDevice; -import org.onosproject.net.DefaultPort; -import org.onosproject.net.Device; -import org.onosproject.net.Device.Type; -import org.onosproject.net.DeviceId; -import org.onosproject.net.MastershipRole; -import org.onosproject.net.OchPort; -import org.onosproject.net.OduCltPort; -import org.onosproject.net.OmsPort; -import org.onosproject.net.Port; -import org.onosproject.net.PortNumber; -import org.onosproject.net.device.DefaultPortStatistics; -import org.onosproject.net.device.DeviceClockService; -import org.onosproject.net.device.DeviceDescription; -import org.onosproject.net.device.DeviceEvent; -import org.onosproject.net.device.DeviceStore; -import org.onosproject.net.device.DeviceStoreDelegate; -import org.onosproject.net.device.OchPortDescription; -import org.onosproject.net.device.OduCltPortDescription; -import org.onosproject.net.device.OmsPortDescription; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.device.PortStatistics; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.AbstractStore; -import org.onosproject.store.Timestamp; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.ClusterMessage; -import org.onosproject.store.cluster.messaging.ClusterMessageHandler; -import org.onosproject.store.cluster.messaging.MessageSubject; -import org.onosproject.store.impl.Timestamped; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.serializers.custom.DistributedStoreSerializers; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.MultiValuedTimestamp; -import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.WallClockTimestamp; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Predicates.notNull; -import static com.google.common.base.Verify.verify; -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; -import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; -import static org.onlab.util.Tools.groupedThreads; -import static org.onlab.util.Tools.minPriority; -import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId; -import static org.onosproject.net.DefaultAnnotations.merge; -import static org.onosproject.net.device.DeviceEvent.Type.*; -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.slf4j.LoggerFactory.getLogger; - -/** - * Manages inventory of infrastructure devices using gossip protocol to distribute - * information. - */ -@Component(immediate = true) -@Service -public class GossipDeviceStore - extends AbstractStore<DeviceEvent, DeviceStoreDelegate> - implements DeviceStore { - - private final Logger log = getLogger(getClass()); - - private static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; - // Timeout in milliseconds to process device or ports on remote master node - private static final int REMOTE_MASTER_TIMEOUT = 1000; - - // innerMap is used to lock a Device, thus instance should never be replaced. - // collection of Description given from various providers - private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>> - deviceDescs = Maps.newConcurrentMap(); - - // cache of Device and Ports generated by compositing descriptions from providers - private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap(); - private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap(); - - private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats; - private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats; - private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> - portStatsListener = new InternalPortStatsListener(); - - // to be updated under Device lock - private final Map<DeviceId, Timestamp> offline = Maps.newHashMap(); - private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap(); - - // available(=UP) devices - private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet(); - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected DeviceClockService deviceClockService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipService mastershipService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipTermService termService; - - - protected static final KryoSerializer SERIALIZER = new KryoSerializer() { - @Override - protected void setupKryoPool() { - serializerPool = KryoNamespace.newBuilder() - .register(DistributedStoreSerializers.STORE_COMMON) - .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) - .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class) - .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class) - .register(InternalDeviceRemovedEvent.class) - .register(new InternalPortEventSerializer(), InternalPortEvent.class) - .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class) - .register(DeviceAntiEntropyAdvertisement.class) - .register(DeviceFragmentId.class) - .register(PortFragmentId.class) - .register(DeviceInjectedEvent.class) - .register(PortInjectedEvent.class) - .build(); - } - }; - - private ExecutorService executor; - - private ScheduledExecutorService backgroundExecutor; - - // TODO make these anti-entropy parameters configurable - private long initialDelaySec = 5; - private long periodSec = 5; - - @Activate - public void activate() { - executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d")); - - backgroundExecutor = - newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d"))); - - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, - new InternalDeviceOfflineEventListener(), - executor); - clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, - new InternalRemoveRequestListener(), - executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, - new InternalDeviceAdvertisementListener(), - backgroundExecutor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor); - clusterCommunicator.addSubscriber( - GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor); - - // start anti-entropy thread - backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), - initialDelaySec, periodSec, TimeUnit.SECONDS); - - // Create a distributed map for port stats. - KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder() - .register(KryoNamespaces.API) - .register(DefaultPortStatistics.class) - .register(DeviceId.class) - .register(MultiValuedTimestamp.class) - .register(WallClockTimestamp.class); - - devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder() - .withName("port-stats") - .withSerializer(deviceDataSerializer) - .withAntiEntropyPeriod(5, TimeUnit.SECONDS) - .withTimestampProvider((k, v) -> new WallClockTimestamp()) - .withTombstonesDisabled() - .build(); - devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>> - eventuallyConsistentMapBuilder() - .withName("port-stats-delta") - .withSerializer(deviceDataSerializer) - .withAntiEntropyPeriod(5, TimeUnit.SECONDS) - .withTimestampProvider((k, v) -> new WallClockTimestamp()) - .withTombstonesDisabled() - .build(); - devicePortStats.addListener(portStatsListener); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - devicePortStats.destroy(); - devicePortDeltaStats.destroy(); - executor.shutdownNow(); - - backgroundExecutor.shutdownNow(); - try { - if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - log.error("Timeout during executor shutdown"); - } - } catch (InterruptedException e) { - log.error("Error during executor shutdown", e); - } - - deviceDescs.clear(); - devices.clear(); - devicePorts.clear(); - availableDevices.clear(); - log.info("Stopped"); - } - - @Override - public int getDeviceCount() { - return devices.size(); - } - - @Override - public Iterable<Device> getDevices() { - return Collections.unmodifiableCollection(devices.values()); - } - - @Override - public Iterable<Device> getAvailableDevices() { - return FluentIterable.from(getDevices()) - .filter(input -> isAvailable(input.id())); - } - - @Override - public Device getDevice(DeviceId deviceId) { - return devices.get(deviceId); - } - - @Override - public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, - DeviceId deviceId, - DeviceDescription deviceDescription) { - NodeId localNode = clusterService.getLocalNode().id(); - NodeId deviceNode = mastershipService.getMasterFor(deviceId); - - // Process device update only if we're the master, - // otherwise signal the actual master. - DeviceEvent deviceEvent = null; - if (localNode.equals(deviceNode)) { - - final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId); - final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); - final Timestamped<DeviceDescription> mergedDesc; - final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); - - synchronized (device) { - deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc); - mergedDesc = device.get(providerId).getDeviceDesc(); - } - - if (deviceEvent != null) { - log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); - notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc)); - } - - } else { - // FIXME Temporary hack for NPE (ONOS-1171). - // Proper fix is to implement forwarding to master on ConfigProvider - // redo ONOS-490 - if (deviceNode == null) { - // silently ignore - return null; - } - - - DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent( - providerId, deviceId, deviceDescription); - - // TODO check unicast return value - clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode); - /* error log: - log.warn("Failed to process injected device id: {} desc: {} " + - "(cluster messaging failed: {})", - deviceId, deviceDescription, e); - */ - } - - return deviceEvent; - } - - private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, - DeviceId deviceId, - Timestamped<DeviceDescription> deltaDesc) { - - // Collection of DeviceDescriptions for a Device - Map<ProviderId, DeviceDescriptions> device - = getOrCreateDeviceDescriptionsMap(deviceId); - - synchronized (device) { - // locking per device - - if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) { - log.debug("Ignoring outdated event: {}", deltaDesc); - return null; - } - - DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc); - - final Device oldDevice = devices.get(deviceId); - final Device newDevice; - - if (deltaDesc == descs.getDeviceDesc() || - deltaDesc.isNewer(descs.getDeviceDesc())) { - // on new device or valid update - descs.putDeviceDesc(deltaDesc); - newDevice = composeDevice(deviceId, device); - } else { - // outdated event, ignored. - return null; - } - if (oldDevice == null) { - // ADD - return createDevice(providerId, newDevice, deltaDesc.timestamp()); - } else { - // UPDATE or ignore (no change or stale) - return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp()); - } - } - } - - // Creates the device and returns the appropriate event if necessary. - // Guarded by deviceDescs value (=Device lock) - private DeviceEvent createDevice(ProviderId providerId, - Device newDevice, Timestamp timestamp) { - - // update composed device cache - Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); - verify(oldDevice == null, - "Unexpected Device in cache. PID:%s [old=%s, new=%s]", - providerId, oldDevice, newDevice); - - if (!providerId.isAncillary()) { - markOnline(newDevice.id(), timestamp); - } - - return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); - } - - // Updates the device and returns the appropriate event if necessary. - // Guarded by deviceDescs value (=Device lock) - private DeviceEvent updateDevice(ProviderId providerId, - Device oldDevice, - Device newDevice, Timestamp newTimestamp) { - // We allow only certain attributes to trigger update - boolean propertiesChanged = - !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) || - !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) || - !Objects.equals(oldDevice.providerId(), newDevice.providerId()); - boolean annotationsChanged = - !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations()); - - // Primary providers can respond to all changes, but ancillary ones - // should respond only to annotation changes. - DeviceEvent event = null; - if ((providerId.isAncillary() && annotationsChanged) || - (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) { - boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice); - if (!replaced) { - verify(replaced, - "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", - providerId, oldDevice, devices.get(newDevice.id()) - , newDevice); - } - - event = new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); - } - - if (!providerId.isAncillary()) { - boolean wasOnline = availableDevices.contains(newDevice.id()); - markOnline(newDevice.id(), newTimestamp); - if (!wasOnline) { - notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null)); - } - } - return event; - } - - @Override - public DeviceEvent markOffline(DeviceId deviceId) { - final Timestamp timestamp = deviceClockService.getTimestamp(deviceId); - final DeviceEvent event = markOfflineInternal(deviceId, timestamp); - if (event != null) { - log.debug("Notifying peers of a device offline topology event for deviceId: {} {}", - deviceId, timestamp); - notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp)); - } - return event; - } - - private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { - - Map<ProviderId, DeviceDescriptions> providerDescs - = getOrCreateDeviceDescriptionsMap(deviceId); - - // locking device - synchronized (providerDescs) { - - // accept off-line if given timestamp is newer than - // the latest Timestamp from Primary provider - DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs); - Timestamp lastTimestamp = primDescs.getLatestTimestamp(); - if (timestamp.compareTo(lastTimestamp) <= 0) { - // outdated event ignore - return null; - } - - offline.put(deviceId, timestamp); - - Device device = devices.get(deviceId); - if (device == null) { - return null; - } - boolean removed = availableDevices.remove(deviceId); - if (removed) { - return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); - } - return null; - } - } - - /** - * Marks the device as available if the given timestamp is not outdated, - * compared to the time the device has been marked offline. - * - * @param deviceId identifier of the device - * @param timestamp of the event triggering this change. - * @return true if availability change request was accepted and changed the state - */ - // Guarded by deviceDescs value (=Device lock) - private boolean markOnline(DeviceId deviceId, Timestamp timestamp) { - // accept on-line if given timestamp is newer than - // the latest offline request Timestamp - Timestamp offlineTimestamp = offline.get(deviceId); - if (offlineTimestamp == null || - offlineTimestamp.compareTo(timestamp) < 0) { - - offline.remove(deviceId); - return availableDevices.add(deviceId); - } - return false; - } - - @Override - public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, - DeviceId deviceId, - List<PortDescription> portDescriptions) { - - NodeId localNode = clusterService.getLocalNode().id(); - // TODO: It might be negligible, but this will have negative impact to topology discovery performance, - // since it will trigger distributed store read. - // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc. - // outside Device subsystem. so that we don't have to modify both Device and Link stores. - // If we don't care much about topology performance, then it might be OK. - NodeId deviceNode = mastershipService.getMasterFor(deviceId); - - // Process port update only if we're the master of the device, - // otherwise signal the actual master. - List<DeviceEvent> deviceEvents = null; - if (localNode.equals(deviceNode)) { - - final Timestamp newTimestamp; - try { - newTimestamp = deviceClockService.getTimestamp(deviceId); - } catch (IllegalStateException e) { - log.info("Timestamp was not available for device {}", deviceId); - log.debug(" discarding {}", portDescriptions); - // Failed to generate timestamp. - - // Possible situation: - // Device connected and became master for short period of time, - // but lost mastership before this instance had the chance to - // retrieve term information. - - // Information dropped here is expected to be recoverable by - // device probing after mastership change - - return Collections.emptyList(); - } - log.debug("timestamp for {} {}", deviceId, newTimestamp); - - final Timestamped<List<PortDescription>> timestampedInput - = new Timestamped<>(portDescriptions, newTimestamp); - final Timestamped<List<PortDescription>> merged; - - final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); - - synchronized (device) { - deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput); - final DeviceDescriptions descs = device.get(providerId); - List<PortDescription> mergedList = - FluentIterable.from(portDescriptions) - .transform(input -> - // lookup merged port description - descs.getPortDesc(input.portNumber()).value() - ).toList(); - merged = new Timestamped<>(mergedList, newTimestamp); - } - - if (!deviceEvents.isEmpty()) { - log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); - notifyPeers(new InternalPortEvent(providerId, deviceId, merged)); - } - - } else { - // FIXME Temporary hack for NPE (ONOS-1171). - // Proper fix is to implement forwarding to master on ConfigProvider - // redo ONOS-490 - if (deviceNode == null) { - // silently ignore - return Collections.emptyList(); - } - - PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions); - - //TODO check unicast return value - clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode); - /* error log: - log.warn("Failed to process injected ports of device id: {} " + - "(cluster messaging failed: {})", - deviceId, e); - */ - } - - return deviceEvents == null ? Collections.emptyList() : deviceEvents; - } - - private List<DeviceEvent> updatePortsInternal(ProviderId providerId, - DeviceId deviceId, - Timestamped<List<PortDescription>> portDescriptions) { - - Device device = devices.get(deviceId); - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); - - Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); - checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); - - List<DeviceEvent> events = new ArrayList<>(); - synchronized (descsMap) { - - if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) { - log.debug("Ignoring outdated events: {}", portDescriptions); - return Collections.emptyList(); - } - - DeviceDescriptions descs = descsMap.get(providerId); - // every provider must provide DeviceDescription. - checkArgument(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); - - Map<PortNumber, Port> ports = getPortMap(deviceId); - - final Timestamp newTimestamp = portDescriptions.timestamp(); - - // Add new ports - Set<PortNumber> processed = new HashSet<>(); - for (PortDescription portDescription : portDescriptions.value()) { - final PortNumber number = portDescription.portNumber(); - processed.add(number); - - final Port oldPort = ports.get(number); - final Port newPort; - - - final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); - if (existingPortDesc == null || - newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) { - // on new port or valid update - // update description - descs.putPortDesc(new Timestamped<>(portDescription, - portDescriptions.timestamp())); - newPort = composePort(device, number, descsMap); - } else { - // outdated event, ignored. - continue; - } - - events.add(oldPort == null ? - createPort(device, newPort, ports) : - updatePort(device, oldPort, newPort, ports)); - } - - events.addAll(pruneOldPorts(device, ports, processed)); - } - return FluentIterable.from(events).filter(notNull()).toList(); - } - - // Creates a new port based on the port description adds it to the map and - // Returns corresponding event. - // Guarded by deviceDescs value (=Device lock) - private DeviceEvent createPort(Device device, Port newPort, - Map<PortNumber, Port> ports) { - ports.put(newPort.number(), newPort); - return new DeviceEvent(PORT_ADDED, device, newPort); - } - - // Checks if the specified port requires update and if so, it replaces the - // existing entry in the map and returns corresponding event. - // Guarded by deviceDescs value (=Device lock) - private DeviceEvent updatePort(Device device, Port oldPort, - Port newPort, - Map<PortNumber, Port> ports) { - if (oldPort.isEnabled() != newPort.isEnabled() || - oldPort.type() != newPort.type() || - oldPort.portSpeed() != newPort.portSpeed() || - !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) { - ports.put(oldPort.number(), newPort); - return new DeviceEvent(PORT_UPDATED, device, newPort); - } - return null; - } - - // Prunes the specified list of ports based on which ports are in the - // processed list and returns list of corresponding events. - // Guarded by deviceDescs value (=Device lock) - private List<DeviceEvent> pruneOldPorts(Device device, - Map<PortNumber, Port> ports, - Set<PortNumber> processed) { - List<DeviceEvent> events = new ArrayList<>(); - Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<PortNumber, Port> e = iterator.next(); - PortNumber portNumber = e.getKey(); - if (!processed.contains(portNumber)) { - events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue())); - iterator.remove(); - } - } - return events; - } - - // Gets the map of ports for the specified device; if one does not already - // exist, it creates and registers a new one. - private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) { - return createIfAbsentUnchecked(devicePorts, deviceId, - NewConcurrentHashMap.<PortNumber, Port>ifNeeded()); - } - - private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap( - DeviceId deviceId) { - Map<ProviderId, DeviceDescriptions> r; - r = deviceDescs.get(deviceId); - if (r == null) { - r = new HashMap<>(); - final Map<ProviderId, DeviceDescriptions> concurrentlyAdded; - concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r); - if (concurrentlyAdded != null) { - r = concurrentlyAdded; - } - } - return r; - } - - // Guarded by deviceDescs value (=Device lock) - private DeviceDescriptions getOrCreateProviderDeviceDescriptions( - Map<ProviderId, DeviceDescriptions> device, - ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) { - synchronized (device) { - DeviceDescriptions r = device.get(providerId); - if (r == null) { - r = new DeviceDescriptions(deltaDesc); - device.put(providerId, r); - } - return r; - } - } - - @Override - public synchronized DeviceEvent updatePortStatus(ProviderId providerId, - DeviceId deviceId, - PortDescription portDescription) { - final Timestamp newTimestamp; - try { - newTimestamp = deviceClockService.getTimestamp(deviceId); - } catch (IllegalStateException e) { - log.info("Timestamp was not available for device {}", deviceId); - log.debug(" discarding {}", portDescription); - // Failed to generate timestamp. Ignoring. - // See updatePorts comment - return null; - } - final Timestamped<PortDescription> deltaDesc - = new Timestamped<>(portDescription, newTimestamp); - final DeviceEvent event; - final Timestamped<PortDescription> mergedDesc; - final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); - synchronized (device) { - event = updatePortStatusInternal(providerId, deviceId, deltaDesc); - mergedDesc = device.get(providerId) - .getPortDesc(portDescription.portNumber()); - } - if (event != null) { - log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); - notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc)); - } - return event; - } - - private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId, - Timestamped<PortDescription> deltaDesc) { - Device device = devices.get(deviceId); - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); - - Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); - checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); - - synchronized (descsMap) { - - if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) { - log.debug("Ignoring outdated event: {}", deltaDesc); - return null; - } - - DeviceDescriptions descs = descsMap.get(providerId); - // assuming all providers must to give DeviceDescription - verify(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); - - ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); - final PortNumber number = deltaDesc.value().portNumber(); - final Port oldPort = ports.get(number); - final Port newPort; - - final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); - if (existingPortDesc == null || - deltaDesc.isNewer(existingPortDesc)) { - // on new port or valid update - // update description - descs.putPortDesc(deltaDesc); - newPort = composePort(device, number, descsMap); - } else { - // same or outdated event, ignored. - log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc); - return null; - } - - if (oldPort == null) { - return createPort(device, newPort, ports); - } else { - return updatePort(device, oldPort, newPort, ports); - } - } - } - - @Override - public List<Port> getPorts(DeviceId deviceId) { - Map<PortNumber, Port> ports = devicePorts.get(deviceId); - if (ports == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(ports.values()); - } - - @Override - public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId, - Collection<PortStatistics> newStatsCollection) { - - Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId); - Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap(); - Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap(); - - if (prvStatsMap != null) { - for (PortStatistics newStats : newStatsCollection) { - PortNumber port = PortNumber.portNumber(newStats.port()); - PortStatistics prvStats = prvStatsMap.get(port); - DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); - PortStatistics deltaStats = builder.build(); - if (prvStats != null) { - deltaStats = calcDeltaStats(deviceId, prvStats, newStats); - } - deltaStatsMap.put(port, deltaStats); - newStatsMap.put(port, newStats); - } - } else { - for (PortStatistics newStats : newStatsCollection) { - PortNumber port = PortNumber.portNumber(newStats.port()); - newStatsMap.put(port, newStats); - } - } - devicePortDeltaStats.put(deviceId, deltaStatsMap); - devicePortStats.put(deviceId, newStatsMap); - // DeviceEvent returns null because of InternalPortStatsListener usage - return null; - } - - /** - * Calculate delta statistics by subtracting previous from new statistics. - * - * @param deviceId device identifier - * @param prvStats previous port statistics - * @param newStats new port statistics - * @return PortStatistics - */ - public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) { - // calculate time difference - long deltaStatsSec, deltaStatsNano; - if (newStats.durationNano() < prvStats.durationNano()) { - deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1); - deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L; - } else { - deltaStatsNano = newStats.durationNano() - prvStats.durationNano(); - deltaStatsSec = newStats.durationSec() - prvStats.durationSec(); - } - DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); - DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId) - .setPort(newStats.port()) - .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived()) - .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent()) - .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived()) - .setBytesSent(newStats.bytesSent() - prvStats.bytesSent()) - .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped()) - .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped()) - .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors()) - .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors()) - .setDurationSec(deltaStatsSec) - .setDurationNano(deltaStatsNano) - .build(); - return deltaStats; - } - - @Override - public List<PortStatistics> getPortStatistics(DeviceId deviceId) { - Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId); - if (portStats == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(portStats.values()); - } - - @Override - public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) { - Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId); - if (portStats == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(portStats.values()); - } - - @Override - public Port getPort(DeviceId deviceId, PortNumber portNumber) { - Map<PortNumber, Port> ports = devicePorts.get(deviceId); - return ports == null ? null : ports.get(portNumber); - } - - @Override - public boolean isAvailable(DeviceId deviceId) { - return availableDevices.contains(deviceId); - } - - @Override - public synchronized DeviceEvent removeDevice(DeviceId deviceId) { - final NodeId myId = clusterService.getLocalNode().id(); - NodeId master = mastershipService.getMasterFor(deviceId); - - // if there exist a master, forward - // if there is no master, try to become one and process - - boolean relinquishAtEnd = false; - if (master == null) { - final MastershipRole myRole = mastershipService.getLocalRole(deviceId); - if (myRole != MastershipRole.NONE) { - relinquishAtEnd = true; - } - log.debug("Temporarily requesting role for {} to remove", deviceId); - mastershipService.requestRoleFor(deviceId); - MastershipTerm term = termService.getMastershipTerm(deviceId); - if (term != null && myId.equals(term.master())) { - master = myId; - } - } - - if (!myId.equals(master)) { - log.debug("{} has control of {}, forwarding remove request", - master, deviceId); - - // TODO check unicast return value - clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master); - /* error log: - log.error("Failed to forward {} remove request to {}", deviceId, master, e); - */ - - // event will be triggered after master processes it. - return null; - } - - // I have control.. - - Timestamp timestamp = deviceClockService.getTimestamp(deviceId); - DeviceEvent event = removeDeviceInternal(deviceId, timestamp); - if (event != null) { - log.debug("Notifying peers of a device removed topology event for deviceId: {}", - deviceId); - notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp)); - } - if (relinquishAtEnd) { - log.debug("Relinquishing temporary role acquired for {}", deviceId); - mastershipService.relinquishMastership(deviceId); - } - return event; - } - - private DeviceEvent removeDeviceInternal(DeviceId deviceId, - Timestamp timestamp) { - - Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId); - synchronized (descs) { - // accept removal request if given timestamp is newer than - // the latest Timestamp from Primary provider - DeviceDescriptions primDescs = getPrimaryDescriptions(descs); - if (primDescs == null) { - return null; - } - - Timestamp lastTimestamp = primDescs.getLatestTimestamp(); - if (timestamp.compareTo(lastTimestamp) <= 0) { - // outdated event ignore - return null; - } - removalRequest.put(deviceId, timestamp); - - Device device = devices.remove(deviceId); - // should DEVICE_REMOVED carry removed ports? - Map<PortNumber, Port> ports = devicePorts.get(deviceId); - if (ports != null) { - ports.clear(); - } - markOfflineInternal(deviceId, timestamp); - descs.clear(); - return device == null ? null : - new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null); - } - } - - /** - * Checks if given timestamp is superseded by removal request - * with more recent timestamp. - * - * @param deviceId identifier of a device - * @param timestampToCheck timestamp of an event to check - * @return true if device is already removed - */ - private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) { - Timestamp removalTimestamp = removalRequest.get(deviceId); - if (removalTimestamp != null && - removalTimestamp.compareTo(timestampToCheck) >= 0) { - // removalRequest is more recent - return true; - } - return false; - } - - /** - * Returns a Device, merging description given from multiple Providers. - * - * @param deviceId device identifier - * @param providerDescs Collection of Descriptions from multiple providers - * @return Device instance - */ - private Device composeDevice(DeviceId deviceId, - Map<ProviderId, DeviceDescriptions> providerDescs) { - - checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied"); - - ProviderId primary = pickPrimaryPid(providerDescs); - - DeviceDescriptions desc = providerDescs.get(primary); - - final DeviceDescription base = desc.getDeviceDesc().value(); - Type type = base.type(); - String manufacturer = base.manufacturer(); - String hwVersion = base.hwVersion(); - String swVersion = base.swVersion(); - String serialNumber = base.serialNumber(); - ChassisId chassisId = base.chassisId(); - DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - annotations = merge(annotations, base.annotations()); - - for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { - if (e.getKey().equals(primary)) { - continue; - } - // Note: should keep track of Description timestamp in the future - // and only merge conflicting keys when timestamp is newer. - // Currently assuming there will never be a key conflict between - // providers - - // annotation merging. not so efficient, should revisit later - annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations()); - } - - return new DefaultDevice(primary, deviceId, type, manufacturer, - hwVersion, swVersion, serialNumber, - chassisId, annotations); - } - - private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled, - PortDescription description, Annotations annotations) { - switch (description.type()) { - case OMS: - OmsPortDescription omsDesc = (OmsPortDescription) description; - return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(), - omsDesc.maxFrequency(), omsDesc.grid(), annotations); - case OCH: - OchPortDescription ochDesc = (OchPortDescription) description; - return new OchPort(device, number, isEnabled, ochDesc.signalType(), - ochDesc.isTunable(), ochDesc.lambda(), annotations); - case ODUCLT: - OduCltPortDescription oduDesc = (OduCltPortDescription) description; - return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations); - default: - return new DefaultPort(device, number, isEnabled, description.type(), - description.portSpeed(), annotations); - } - } - - /** - * Returns a Port, merging description given from multiple Providers. - * - * @param device device the port is on - * @param number port number - * @param descsMap Collection of Descriptions from multiple providers - * @return Port instance - */ - private Port composePort(Device device, PortNumber number, - Map<ProviderId, DeviceDescriptions> descsMap) { - - ProviderId primary = pickPrimaryPid(descsMap); - DeviceDescriptions primDescs = descsMap.get(primary); - // if no primary, assume not enabled - boolean isEnabled = false; - DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - Timestamp newest = null; - final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number); - if (portDesc != null) { - isEnabled = portDesc.value().isEnabled(); - annotations = merge(annotations, portDesc.value().annotations()); - newest = portDesc.timestamp(); - } - Port updated = null; - for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) { - if (e.getKey().equals(primary)) { - continue; - } - // Note: should keep track of Description timestamp in the future - // and only merge conflicting keys when timestamp is newer. - // Currently assuming there will never be a key conflict between - // providers - - // annotation merging. not so efficient, should revisit later - final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number); - if (otherPortDesc != null) { - if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) { - continue; - } - annotations = merge(annotations, otherPortDesc.value().annotations()); - PortDescription other = otherPortDesc.value(); - updated = buildTypedPort(device, number, isEnabled, other, annotations); - newest = otherPortDesc.timestamp(); - } - } - if (portDesc == null) { - return updated == null ? new DefaultPort(device, number, false, annotations) : updated; - } - PortDescription current = portDesc.value(); - return updated == null - ? buildTypedPort(device, number, isEnabled, current, annotations) - : updated; - } - - /** - * @return primary ProviderID, or randomly chosen one if none exists - */ - private ProviderId pickPrimaryPid( - Map<ProviderId, DeviceDescriptions> providerDescs) { - ProviderId fallBackPrimary = null; - for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { - if (!e.getKey().isAncillary()) { - return e.getKey(); - } else if (fallBackPrimary == null) { - // pick randomly as a fallback in case there is no primary - fallBackPrimary = e.getKey(); - } - } - return fallBackPrimary; - } - - private DeviceDescriptions getPrimaryDescriptions( - Map<ProviderId, DeviceDescriptions> providerDescs) { - ProviderId pid = pickPrimaryPid(providerDescs); - return providerDescs.get(pid); - } - - private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException { - clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient); - } - - private void broadcastMessage(MessageSubject subject, Object event) { - clusterCommunicator.broadcast(event, subject, SERIALIZER::encode); - } - - private void notifyPeers(InternalDeviceEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event); - } - - private void notifyPeers(InternalDeviceOfflineEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event); - } - - private void notifyPeers(InternalDeviceRemovedEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event); - } - - private void notifyPeers(InternalPortEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event); - } - - private void notifyPeers(InternalPortStatusEvent event) { - broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event); - } - - private void notifyPeer(NodeId recipient, InternalDeviceEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private void notifyPeer(NodeId recipient, InternalPortEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) { - try { - unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event); - } catch (IOException e) { - log.error("Failed to send" + event + " to " + recipient, e); - } - } - - private DeviceAntiEntropyAdvertisement createAdvertisement() { - final NodeId self = clusterService.getLocalNode().id(); - - final int numDevices = deviceDescs.size(); - Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices); - final int portsPerDevice = 8; // random factor to minimize reallocation - Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice); - Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices); - - deviceDescs.forEach((deviceId, devDescs) -> { - - // for each Device... - synchronized (devDescs) { - - // send device offline timestamp - Timestamp lOffline = this.offline.get(deviceId); - if (lOffline != null) { - adOffline.put(deviceId, lOffline); - } - - for (Entry<ProviderId, DeviceDescriptions> - prov : devDescs.entrySet()) { - - // for each Provider Descriptions... - final ProviderId provId = prov.getKey(); - final DeviceDescriptions descs = prov.getValue(); - - adDevices.put(new DeviceFragmentId(deviceId, provId), - descs.getDeviceDesc().timestamp()); - - for (Entry<PortNumber, Timestamped<PortDescription>> - portDesc : descs.getPortDescs().entrySet()) { - - final PortNumber number = portDesc.getKey(); - adPorts.put(new PortFragmentId(deviceId, provId, number), - portDesc.getValue().timestamp()); - } - } - } - }); - - return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline); - } - - /** - * Responds to anti-entropy advertisement message. - * <p/> - * Notify sender about out-dated information using regular replication message. - * Send back advertisement to sender if not in sync. - * - * @param advertisement to respond to - */ - private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) { - - final NodeId sender = advertisement.sender(); - - Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints()); - Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports()); - Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline()); - - // Fragments to request - Collection<DeviceFragmentId> reqDevices = new ArrayList<>(); - Collection<PortFragmentId> reqPorts = new ArrayList<>(); - - for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) { - final DeviceId deviceId = de.getKey(); - final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue(); - - synchronized (lDevice) { - // latestTimestamp across provider - // Note: can be null initially - Timestamp localLatest = offline.get(deviceId); - - // handle device Ads - for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) { - final ProviderId provId = prov.getKey(); - final DeviceDescriptions lDeviceDescs = prov.getValue(); - - final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId); - - - Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc(); - Timestamp advDevTimestamp = devAds.get(devFragId); - - if (advDevTimestamp == null || lProvDevice.isNewerThan( - advDevTimestamp)) { - // remote does not have it or outdated, suggest - notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice)); - } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) { - // local is outdated, request - reqDevices.add(devFragId); - } - - // handle port Ads - for (Entry<PortNumber, Timestamped<PortDescription>> - pe : lDeviceDescs.getPortDescs().entrySet()) { - - final PortNumber num = pe.getKey(); - final Timestamped<PortDescription> lPort = pe.getValue(); - - final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num); - - Timestamp advPortTimestamp = portAds.get(portFragId); - if (advPortTimestamp == null || lPort.isNewerThan( - advPortTimestamp)) { - // remote does not have it or outdated, suggest - notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort)); - } else if (!lPort.timestamp().equals(advPortTimestamp)) { - // local is outdated, request - log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp); - reqPorts.add(portFragId); - } - - // remove port Ad already processed - portAds.remove(portFragId); - } // end local port loop - - // remove device Ad already processed - devAds.remove(devFragId); - - // find latest and update - final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp(); - if (localLatest == null || - providerLatest.compareTo(localLatest) > 0) { - localLatest = providerLatest; - } - } // end local provider loop - - // checking if remote timestamp is more recent. - Timestamp rOffline = offlineAds.get(deviceId); - if (rOffline != null && - rOffline.compareTo(localLatest) > 0) { - // remote offline timestamp suggests that the - // device is off-line - markOfflineInternal(deviceId, rOffline); - } - - Timestamp lOffline = offline.get(deviceId); - if (lOffline != null && rOffline == null) { - // locally offline, but remote is online, suggest offline - notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline)); - } - - // remove device offline Ad already processed - offlineAds.remove(deviceId); - } // end local device loop - } // device lock - - // If there is any Ads left, request them - log.trace("Ads left {}, {}", devAds, portAds); - reqDevices.addAll(devAds.keySet()); - reqPorts.addAll(portAds.keySet()); - - if (reqDevices.isEmpty() && reqPorts.isEmpty()) { - log.trace("Nothing to request to remote peer {}", sender); - return; - } - - log.debug("Need to sync {} {}", reqDevices, reqPorts); - - // 2-way Anti-Entropy for now - try { - unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement()); - } catch (IOException e) { - log.error("Failed to send response advertisement to " + sender, e); - } - -// Sketch of 3-way Anti-Entropy -// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts); -// ClusterMessage message = new ClusterMessage( -// clusterService.getLocalNode().id(), -// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST, -// SERIALIZER.encode(request)); -// -// try { -// clusterCommunicator.unicast(message, advertisement.sender()); -// } catch (IOException e) { -// log.error("Failed to send advertisement reply to " -// + advertisement.sender(), e); -// } - } - - private void notifyDelegateIfNotNull(DeviceEvent event) { - if (event != null) { - notifyDelegate(event); - } - } - - private final class SendAdvertisementTask implements Runnable { - - @Override - public void run() { - if (Thread.currentThread().isInterrupted()) { - log.debug("Interrupted, quitting"); - return; - } - - try { - final NodeId self = clusterService.getLocalNode().id(); - Set<ControllerNode> nodes = clusterService.getNodes(); - - ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes) - .transform(toNodeId()) - .toList(); - - if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) { - log.trace("No other peers in the cluster."); - return; - } - - NodeId peer; - do { - int idx = RandomUtils.nextInt(0, nodeIds.size()); - peer = nodeIds.get(idx); - } while (peer.equals(self)); - - DeviceAntiEntropyAdvertisement ad = createAdvertisement(); - - if (Thread.currentThread().isInterrupted()) { - log.debug("Interrupted, quitting"); - return; - } - - try { - unicastMessage(peer, DEVICE_ADVERTISE, ad); - } catch (IOException e) { - log.debug("Failed to send anti-entropy advertisement to {}", peer); - return; - } - } catch (Exception e) { - // catch all Exception to avoid Scheduled task being suppressed. - log.error("Exception thrown while sending advertisement", e); - } - } - } - - private final class InternalDeviceEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received device update event from peer: {}", message.sender()); - InternalDeviceEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - Timestamped<DeviceDescription> deviceDescription = event.deviceDescription(); - - try { - notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription)); - } catch (Exception e) { - log.warn("Exception thrown handling device update", e); - } - } - } - - private final class InternalDeviceOfflineEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received device offline event from peer: {}", message.sender()); - InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload()); - - DeviceId deviceId = event.deviceId(); - Timestamp timestamp = event.timestamp(); - - try { - notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp)); - } catch (Exception e) { - log.warn("Exception thrown handling device offline", e); - } - } - } - - private final class InternalRemoveRequestListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received device remove request from peer: {}", message.sender()); - DeviceId did = SERIALIZER.decode(message.payload()); - - try { - removeDevice(did); - } catch (Exception e) { - log.warn("Exception thrown handling device remove", e); - } - } - } - - private final class InternalDeviceRemovedEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received device removed event from peer: {}", message.sender()); - InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload()); - - DeviceId deviceId = event.deviceId(); - Timestamp timestamp = event.timestamp(); - - try { - notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp)); - } catch (Exception e) { - log.warn("Exception thrown handling device removed", e); - } - } - } - - private final class InternalPortEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - - log.debug("Received port update event from peer: {}", message.sender()); - InternalPortEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions(); - - if (getDevice(deviceId) == null) { - log.debug("{} not found on this node yet, ignoring.", deviceId); - // Note: dropped information will be recovered by anti-entropy - return; - } - - try { - notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions)); - } catch (Exception e) { - log.warn("Exception thrown handling port update", e); - } - } - } - - private final class InternalPortStatusEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - - log.debug("Received port status update event from peer: {}", message.sender()); - InternalPortStatusEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - Timestamped<PortDescription> portDescription = event.portDescription(); - - if (getDevice(deviceId) == null) { - log.debug("{} not found on this node yet, ignoring.", deviceId); - // Note: dropped information will be recovered by anti-entropy - return; - } - - try { - notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription)); - } catch (Exception e) { - log.warn("Exception thrown handling port update", e); - } - } - } - - private final class InternalDeviceAdvertisementListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender()); - DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload()); - try { - handleAdvertisement(advertisement); - } catch (Exception e) { - log.warn("Exception thrown handling Device advertisements.", e); - } - } - } - - private final class DeviceInjectedEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received injected device event from peer: {}", message.sender()); - DeviceInjectedEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - DeviceDescription deviceDescription = event.deviceDescription(); - if (!deviceClockService.isTimestampAvailable(deviceId)) { - // workaround for ONOS-1208 - log.warn("Not ready to accept update. Dropping {}", deviceDescription); - return; - } - - try { - createOrUpdateDevice(providerId, deviceId, deviceDescription); - } catch (Exception e) { - log.warn("Exception thrown handling device injected event.", e); - } - } - } - - private final class PortInjectedEventListener - implements ClusterMessageHandler { - @Override - public void handle(ClusterMessage message) { - log.debug("Received injected port event from peer: {}", message.sender()); - PortInjectedEvent event = SERIALIZER.decode(message.payload()); - - ProviderId providerId = event.providerId(); - DeviceId deviceId = event.deviceId(); - List<PortDescription> portDescriptions = event.portDescriptions(); - if (!deviceClockService.isTimestampAvailable(deviceId)) { - // workaround for ONOS-1208 - log.warn("Not ready to accept update. Dropping {}", portDescriptions); - return; - } - - try { - updatePorts(providerId, deviceId, portDescriptions); - } catch (Exception e) { - log.warn("Exception thrown handling port injected event.", e); - } - } - } - - private class InternalPortStatsListener - implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> { - @Override - public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) { - if (event.type() == PUT) { - Device device = devices.get(event.key()); - if (device != null) { - delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device)); - } - } - } - } -} |