diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java | 784 |
1 files changed, 784 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java new file mode 100644 index 00000000..2dae55bb --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java @@ -0,0 +1,784 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.device.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; +import static org.onosproject.net.DefaultAnnotations.merge; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.packet.ChassisId; +import org.onlab.util.KryoNamespace; +import org.onlab.util.SharedExecutors; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.mastership.MastershipService; +import org.onosproject.mastership.MastershipTermService; +import org.onosproject.net.Annotations; +import org.onosproject.net.AnnotationsUtil; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.DefaultDevice; +import org.onosproject.net.DefaultPort; +import org.onosproject.net.Device; +import org.onosproject.net.DeviceId; +import org.onosproject.net.MastershipRole; +import org.onosproject.net.OchPort; +import org.onosproject.net.OduCltPort; +import org.onosproject.net.OmsPort; +import org.onosproject.net.Port; +import org.onosproject.net.PortNumber; +import org.onosproject.net.Device.Type; +import org.onosproject.net.device.DefaultPortStatistics; +import org.onosproject.net.device.DeviceClockService; +import org.onosproject.net.device.DeviceDescription; +import org.onosproject.net.device.DeviceEvent; +import org.onosproject.net.device.DeviceStore; +import org.onosproject.net.device.DeviceStoreDelegate; +import org.onosproject.net.device.OchPortDescription; +import org.onosproject.net.device.OduCltPortDescription; +import org.onosproject.net.device.OmsPortDescription; +import org.onosproject.net.device.PortDescription; +import org.onosproject.net.device.PortStatistics; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.impl.MastershipBasedTimestamp; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.serializers.KryoSerializer; +import org.onosproject.store.serializers.custom.DistributedStoreSerializers; +import org.onosproject.store.service.DistributedSet; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapEvent; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.SetEvent; +import org.onosproject.store.service.SetEventListener; +import org.onosproject.store.service.WallClockTimestamp; + +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; + +import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.StorageService; +import org.slf4j.Logger; + +import static org.onosproject.net.device.DeviceEvent.Type.*; +import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED; +import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ; +import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; + +/** + * Manages the inventory of devices using a {@code EventuallyConsistentMap}. + */ +@Component(immediate = true, enabled = false) +@Service +public class ECDeviceStore + extends AbstractStore<DeviceEvent, DeviceStoreDelegate> + implements DeviceStore { + + private final Logger log = getLogger(getClass()); + + private static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; + + private final Map<DeviceId, Device> devices = Maps.newConcurrentMap(); + private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap(); + Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet(); + + private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions; + private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions; + private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats; + private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats; + + private DistributedSet<DeviceId> availableDevices; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipTermService mastershipTermService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceClockService deviceClockService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + private NodeId localNodeId; + private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener = + new InternalDeviceChangeEventListener(); + private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener = + new InternalPortChangeEventListener(); + private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener = + new InternalPortStatsListener(); + private final SetEventListener<DeviceId> deviceStatusTracker = + new InternalDeviceStatusTracker(); + + protected static final KryoSerializer SERIALIZER = new KryoSerializer() { + @Override + protected void setupKryoPool() { + serializerPool = KryoNamespace.newBuilder() + .register(DistributedStoreSerializers.STORE_COMMON) + .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) + .register(DeviceInjectedEvent.class) + .register(PortInjectedEvent.class) + .build(); + } + }; + + protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(DeviceKey.class) + .register(PortKey.class) + .register(DeviceKey.class) + .register(PortKey.class) + .register(MastershipBasedTimestamp.class); + + @Activate + public void activate() { + localNodeId = clusterService.getLocalNode().id(); + + deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder() + .withName("onos-device-descriptions") + .withSerializer(SERIALIZER_BUILDER) + .withTimestampProvider((k, v) -> { + try { + return deviceClockService.getTimestamp(k.deviceId()); + } catch (IllegalStateException e) { + return null; + } + }).build(); + + portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder() + .withName("onos-port-descriptions") + .withSerializer(SERIALIZER_BUILDER) + .withTimestampProvider((k, v) -> { + try { + return deviceClockService.getTimestamp(k.deviceId()); + } catch (IllegalStateException e) { + return null; + } + }).build(); + + devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder() + .withName("onos-port-stats") + .withSerializer(SERIALIZER_BUILDER) + .withAntiEntropyPeriod(5, TimeUnit.SECONDS) + .withTimestampProvider((k, v) -> new WallClockTimestamp()) + .withTombstonesDisabled() + .build(); + + devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>> + eventuallyConsistentMapBuilder() + .withName("onos-port-stats-delta") + .withSerializer(SERIALIZER_BUILDER) + .withAntiEntropyPeriod(5, TimeUnit.SECONDS) + .withTimestampProvider((k, v) -> new WallClockTimestamp()) + .withTombstonesDisabled() + .build(); + + clusterCommunicator.addSubscriber(DEVICE_INJECTED, + SERIALIZER::decode, + this::injectDevice, + SERIALIZER::encode, + SharedExecutors.getPoolThreadExecutor()); + + clusterCommunicator.addSubscriber(PORT_INJECTED, + SERIALIZER::decode, + this::injectPort, + SERIALIZER::encode, + SharedExecutors.getPoolThreadExecutor()); + + availableDevices = storageService.<DeviceId>setBuilder() + .withName("onos-online-devices") + .withSerializer(Serializer.using(KryoNamespaces.API)) + .withPartitionsDisabled() + .withRelaxedReadConsistency() + .build(); + + deviceDescriptions.addListener(deviceUpdateListener); + portDescriptions.addListener(portUpdateListener); + devicePortStats.addListener(portStatsListener); + availableDevices.addListener(deviceStatusTracker); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + devicePortStats.removeListener(portStatsListener); + deviceDescriptions.removeListener(deviceUpdateListener); + portDescriptions.removeListener(portUpdateListener); + availableDevices.removeListener(deviceStatusTracker); + devicePortStats.destroy(); + devicePortDeltaStats.destroy(); + deviceDescriptions.destroy(); + portDescriptions.destroy(); + devices.clear(); + devicePorts.clear(); + clusterCommunicator.removeSubscriber(DEVICE_INJECTED); + clusterCommunicator.removeSubscriber(PORT_INJECTED); + log.info("Stopped"); + } + + @Override + public Iterable<Device> getDevices() { + return devices.values(); + } + + @Override + public int getDeviceCount() { + return devices.size(); + } + + @Override + public Device getDevice(DeviceId deviceId) { + return devices.get(deviceId); + } + + @Override + public DeviceEvent createOrUpdateDevice(ProviderId providerId, + DeviceId deviceId, + DeviceDescription deviceDescription) { + NodeId master = mastershipService.getMasterFor(deviceId); + if (localNodeId.equals(master)) { + deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription); + return refreshDeviceCache(providerId, deviceId); + } else { + DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription); + return Futures.getUnchecked( + clusterCommunicator.sendAndReceive(deviceInjectedEvent, + DEVICE_INJECTED, + SERIALIZER::encode, + SERIALIZER::decode, + master)); + } + } + + private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) { + AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>(); + Device device = devices.compute(deviceId, (k, existingDevice) -> { + Device newDevice = composeDevice(deviceId); + if (existingDevice == null) { + eventType.set(DEVICE_ADDED); + } else { + // We allow only certain attributes to trigger update + boolean propertiesChanged = + !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) || + !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) || + !Objects.equals(existingDevice.providerId(), newDevice.providerId()); + boolean annotationsChanged = + !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations()); + + // Primary providers can respond to all changes, but ancillary ones + // should respond only to annotation changes. + if ((providerId.isAncillary() && annotationsChanged) || + (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) { + boolean replaced = devices.replace(deviceId, existingDevice, newDevice); + verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", + providerId, existingDevice, devices.get(deviceId), newDevice); + eventType.set(DEVICE_UPDATED); + } + } + return newDevice; + }); + if (eventType.get() != null && !providerId.isAncillary()) { + markOnline(deviceId); + } + return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null; + } + + /** + * Returns the primary providerId for a device. + * @param deviceId device identifier + * @return primary providerId + */ + private Set<ProviderId> getAllProviders(DeviceId deviceId) { + return deviceDescriptions.keySet() + .stream() + .filter(deviceKey -> deviceKey.deviceId().equals(deviceId)) + .map(deviceKey -> deviceKey.providerId()) + .collect(Collectors.toSet()); + } + + /** + * Returns the identifier for all providers for a device. + * @param deviceId device identifier + * @return set of provider identifiers + */ + private ProviderId getPrimaryProviderId(DeviceId deviceId) { + Set<ProviderId> allProviderIds = getAllProviders(deviceId); + return allProviderIds.stream() + .filter(p -> !p.isAncillary()) + .findFirst() + .orElse(Iterables.getFirst(allProviderIds, null)); + } + + /** + * Returns a Device, merging descriptions from multiple Providers. + * + * @param deviceId device identifier + * @return Device instance + */ + private Device composeDevice(DeviceId deviceId) { + + ProviderId primaryProviderId = getPrimaryProviderId(deviceId); + DeviceDescription primaryDeviceDescription = + deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId)); + + Type type = primaryDeviceDescription.type(); + String manufacturer = primaryDeviceDescription.manufacturer(); + String hwVersion = primaryDeviceDescription.hwVersion(); + String swVersion = primaryDeviceDescription.swVersion(); + String serialNumber = primaryDeviceDescription.serialNumber(); + ChassisId chassisId = primaryDeviceDescription.chassisId(); + DefaultAnnotations annotations = mergeAnnotations(deviceId); + + return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer, + hwVersion, swVersion, serialNumber, + chassisId, annotations); + } + + private DeviceEvent purgeDeviceCache(DeviceId deviceId) { + Device removedDevice = devices.remove(deviceId); + if (removedDevice != null) { + getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId))); + return new DeviceEvent(DEVICE_REMOVED, removedDevice); + } + return null; + } + + private boolean markOnline(DeviceId deviceId) { + return availableDevices.add(deviceId); + } + + @Override + public DeviceEvent markOffline(DeviceId deviceId) { + availableDevices.remove(deviceId); + // set update listener will raise the event. + return null; + } + + @Override + public List<DeviceEvent> updatePorts(ProviderId providerId, + DeviceId deviceId, + List<PortDescription> descriptions) { + NodeId master = mastershipService.getMasterFor(deviceId); + List<DeviceEvent> deviceEvents = null; + if (localNodeId.equals(master)) { + descriptions.forEach(description -> { + PortKey portKey = new PortKey(providerId, deviceId, description.portNumber()); + portDescriptions.put(portKey, description); + }); + deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty()); + } else { + if (master == null) { + return Collections.emptyList(); + } + PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions); + deviceEvents = Futures.getUnchecked( + clusterCommunicator.sendAndReceive(portInjectedEvent, + PORT_INJECTED, + SERIALIZER::encode, + SERIALIZER::decode, + master)); + } + return deviceEvents == null ? Collections.emptyList() : deviceEvents; + } + + private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId, + DeviceId deviceId, + Optional<PortNumber> portNumber) { + Device device = devices.get(deviceId); + checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); + List<DeviceEvent> events = Lists.newArrayList(); + + Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap()); + List<PortDescription> descriptions = Lists.newArrayList(); + portDescriptions.entrySet().forEach(e -> { + PortKey key = e.getKey(); + PortDescription value = e.getValue(); + if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) { + if (portNumber.isPresent()) { + if (portNumber.get().equals(key.portNumber())) { + descriptions.add(value); + } + } else { + descriptions.add(value); + } + } + }); + + for (PortDescription description : descriptions) { + final PortNumber number = description.portNumber(); + ports.compute(number, (k, existingPort) -> { + Port newPort = composePort(device, number); + if (existingPort == null) { + events.add(new DeviceEvent(PORT_ADDED, device, newPort)); + } else { + if (existingPort.isEnabled() != newPort.isEnabled() || + existingPort.type() != newPort.type() || + existingPort.portSpeed() != newPort.portSpeed() || + !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) { + events.add(new DeviceEvent(PORT_UPDATED, device, newPort)); + } + } + return newPort; + }); + } + + return events; + } + + /** + * Returns a Port, merging descriptions from multiple Providers. + * + * @param device device the port is on + * @param number port number + * @return Port instance + */ + private Port composePort(Device device, PortNumber number) { + + Map<ProviderId, PortDescription> descriptions = Maps.newHashMap(); + portDescriptions.entrySet().forEach(entry -> { + PortKey portKey = entry.getKey(); + if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) { + descriptions.put(portKey.providerId(), entry.getValue()); + } + }); + ProviderId primary = getPrimaryProviderId(device.id()); + PortDescription primaryDescription = descriptions.get(primary); + + // if no primary, assume not enabled + boolean isEnabled = false; + DefaultAnnotations annotations = DefaultAnnotations.builder().build(); + if (primaryDescription != null) { + isEnabled = primaryDescription.isEnabled(); + annotations = merge(annotations, primaryDescription.annotations()); + } + Port updated = null; + for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) { + if (e.getKey().equals(primary)) { + continue; + } + annotations = merge(annotations, e.getValue().annotations()); + updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations); + } + if (primaryDescription == null) { + return updated == null ? new DefaultPort(device, number, false, annotations) : updated; + } + return updated == null + ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations) + : updated; + } + + private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled, + PortDescription description, Annotations annotations) { + switch (description.type()) { + case OMS: + OmsPortDescription omsDesc = (OmsPortDescription) description; + return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(), + omsDesc.maxFrequency(), omsDesc.grid(), annotations); + case OCH: + OchPortDescription ochDesc = (OchPortDescription) description; + return new OchPort(device, number, isEnabled, ochDesc.signalType(), + ochDesc.isTunable(), ochDesc.lambda(), annotations); + case ODUCLT: + OduCltPortDescription oduDesc = (OduCltPortDescription) description; + return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations); + default: + return new DefaultPort(device, number, isEnabled, description.type(), + description.portSpeed(), annotations); + } + } + + @Override + public DeviceEvent updatePortStatus(ProviderId providerId, + DeviceId deviceId, + PortDescription portDescription) { + portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription); + List<DeviceEvent> events = + refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber())); + return Iterables.getFirst(events, null); + } + + @Override + public List<Port> getPorts(DeviceId deviceId) { + return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values()); + } + + @Override + public Port getPort(DeviceId deviceId, PortNumber portNumber) { + return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber); + } + + @Override + public DeviceEvent updatePortStatistics(ProviderId providerId, + DeviceId deviceId, + Collection<PortStatistics> newStatsCollection) { + + Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId); + Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap(); + Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap(); + + if (prvStatsMap != null) { + for (PortStatistics newStats : newStatsCollection) { + PortNumber port = PortNumber.portNumber(newStats.port()); + PortStatistics prvStats = prvStatsMap.get(port); + DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); + PortStatistics deltaStats = builder.build(); + if (prvStats != null) { + deltaStats = calcDeltaStats(deviceId, prvStats, newStats); + } + deltaStatsMap.put(port, deltaStats); + newStatsMap.put(port, newStats); + } + } else { + for (PortStatistics newStats : newStatsCollection) { + PortNumber port = PortNumber.portNumber(newStats.port()); + newStatsMap.put(port, newStats); + } + } + devicePortDeltaStats.put(deviceId, deltaStatsMap); + devicePortStats.put(deviceId, newStatsMap); + // DeviceEvent returns null because of InternalPortStatsListener usage + return null; + } + + /** + * Calculate delta statistics by subtracting previous from new statistics. + * + * @param deviceId device indentifier + * @param prvStats previous port statistics + * @param newStats new port statistics + * @return PortStatistics + */ + public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) { + // calculate time difference + long deltaStatsSec, deltaStatsNano; + if (newStats.durationNano() < prvStats.durationNano()) { + deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1); + deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L; + } else { + deltaStatsNano = newStats.durationNano() - prvStats.durationNano(); + deltaStatsSec = newStats.durationSec() - prvStats.durationSec(); + } + DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); + DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId) + .setPort(newStats.port()) + .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived()) + .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent()) + .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived()) + .setBytesSent(newStats.bytesSent() - prvStats.bytesSent()) + .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped()) + .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped()) + .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors()) + .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors()) + .setDurationSec(deltaStatsSec) + .setDurationNano(deltaStatsNano) + .build(); + return deltaStats; + } + + @Override + public List<PortStatistics> getPortStatistics(DeviceId deviceId) { + Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId); + if (portStats == null) { + return Collections.emptyList(); + } + return ImmutableList.copyOf(portStats.values()); + } + + @Override + public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) { + Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId); + if (portStats == null) { + return Collections.emptyList(); + } + return ImmutableList.copyOf(portStats.values()); + } + + @Override + public boolean isAvailable(DeviceId deviceId) { + return availableDevices.contains(deviceId); + } + + @Override + public Iterable<Device> getAvailableDevices() { + return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null); + } + + @Override + public DeviceEvent removeDevice(DeviceId deviceId) { + NodeId master = mastershipService.getMasterFor(deviceId); + // if there exist a master, forward + // if there is no master, try to become one and process + boolean relinquishAtEnd = false; + if (master == null) { + final MastershipRole myRole = mastershipService.getLocalRole(deviceId); + if (myRole != MastershipRole.NONE) { + relinquishAtEnd = true; + } + log.debug("Temporarily requesting role for {} to remove", deviceId); + MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId)); + if (role == MastershipRole.MASTER) { + master = localNodeId; + } + } + + if (!localNodeId.equals(master)) { + log.debug("{} has control of {}, forwarding remove request", + master, deviceId); + + clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master) + .whenComplete((r, e) -> { + if (e != null) { + log.error("Failed to forward {} remove request to its master", deviceId, e); + } + }); + return null; + } + + // I have control.. + DeviceEvent event = null; + final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId); + DeviceDescription removedDeviceDescription = + deviceDescriptions.remove(deviceKey); + if (removedDeviceDescription != null) { + event = purgeDeviceCache(deviceId); + } + + if (relinquishAtEnd) { + log.debug("Relinquishing temporary role acquired for {}", deviceId); + mastershipService.relinquishMastership(deviceId); + } + return event; + } + + private DeviceEvent injectDevice(DeviceInjectedEvent event) { + return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription()); + } + + private List<DeviceEvent> injectPort(PortInjectedEvent event) { + return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions()); + } + + private DefaultAnnotations mergeAnnotations(DeviceId deviceId) { + ProviderId primaryProviderId = getPrimaryProviderId(deviceId); + DeviceDescription primaryDeviceDescription = + deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId)); + DefaultAnnotations annotations = DefaultAnnotations.builder().build(); + annotations = merge(annotations, primaryDeviceDescription.annotations()); + for (ProviderId providerId : getAllProviders(deviceId)) { + if (!providerId.equals(primaryProviderId)) { + annotations = merge(annotations, + deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations()); + } + } + return annotations; + } + + private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> { + @Override + public void event(SetEvent<DeviceId> event) { + final DeviceId deviceId = event.entry(); + final Device device = devices.get(deviceId); + if (device != null) { + notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device)); + } else { + pendingAvailableChangeUpdates.add(deviceId); + } + } + } + + private class InternalDeviceChangeEventListener + implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> { + @Override + public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) { + DeviceId deviceId = event.key().deviceId(); + ProviderId providerId = event.key().providerId(); + if (event.type() == PUT) { + notifyDelegate(refreshDeviceCache(providerId, deviceId)); + if (pendingAvailableChangeUpdates.remove(deviceId)) { + notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId))); + } + } else if (event.type() == REMOVE) { + notifyDelegate(purgeDeviceCache(deviceId)); + } + } + } + + private class InternalPortChangeEventListener + implements EventuallyConsistentMapListener<PortKey, PortDescription> { + @Override + public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) { + DeviceId deviceId = event.key().deviceId(); + ProviderId providerId = event.key().providerId(); + PortNumber portNumber = event.key().portNumber(); + if (event.type() == PUT) { + if (devices.containsKey(deviceId)) { + List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber)); + for (DeviceEvent deviceEvent : events) { + notifyDelegate(deviceEvent); + } + } + } else if (event.type() == REMOVE) { + log.warn("Unexpected port removed event"); + } + } + } + + private class InternalPortStatsListener + implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> { + @Override + public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) { + if (event.type() == PUT) { + Device device = devices.get(event.key()); + if (device != null) { + delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device)); + } + } + } + } +}
\ No newline at end of file |