diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java | 784 |
1 files changed, 0 insertions, 784 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java deleted file mode 100644 index 2dae55bb..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java +++ /dev/null @@ -1,784 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.device.impl; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Verify.verify; -import static org.onosproject.net.DefaultAnnotations.merge; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.packet.ChassisId; -import org.onlab.util.KryoNamespace; -import org.onlab.util.SharedExecutors; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.NodeId; -import org.onosproject.mastership.MastershipService; -import org.onosproject.mastership.MastershipTermService; -import org.onosproject.net.Annotations; -import org.onosproject.net.AnnotationsUtil; -import org.onosproject.net.DefaultAnnotations; -import org.onosproject.net.DefaultDevice; -import org.onosproject.net.DefaultPort; -import org.onosproject.net.Device; -import org.onosproject.net.DeviceId; -import org.onosproject.net.MastershipRole; -import org.onosproject.net.OchPort; -import org.onosproject.net.OduCltPort; -import org.onosproject.net.OmsPort; -import org.onosproject.net.Port; -import org.onosproject.net.PortNumber; -import org.onosproject.net.Device.Type; -import org.onosproject.net.device.DefaultPortStatistics; -import org.onosproject.net.device.DeviceClockService; -import org.onosproject.net.device.DeviceDescription; -import org.onosproject.net.device.DeviceEvent; -import org.onosproject.net.device.DeviceStore; -import org.onosproject.net.device.DeviceStoreDelegate; -import org.onosproject.net.device.OchPortDescription; -import org.onosproject.net.device.OduCltPortDescription; -import org.onosproject.net.device.OmsPortDescription; -import org.onosproject.net.device.PortDescription; -import org.onosproject.net.device.PortStatistics; -import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.AbstractStore; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.impl.MastershipBasedTimestamp; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.serializers.custom.DistributedStoreSerializers; -import org.onosproject.store.service.DistributedSet; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.SetEvent; -import org.onosproject.store.service.SetEventListener; -import org.onosproject.store.service.WallClockTimestamp; - -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; - -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.StorageService; -import org.slf4j.Logger; - -import static org.onosproject.net.device.DeviceEvent.Type.*; -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED; -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ; -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; - -/** - * Manages the inventory of devices using a {@code EventuallyConsistentMap}. - */ -@Component(immediate = true, enabled = false) -@Service -public class ECDeviceStore - extends AbstractStore<DeviceEvent, DeviceStoreDelegate> - implements DeviceStore { - - private final Logger log = getLogger(getClass()); - - private static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; - - private final Map<DeviceId, Device> devices = Maps.newConcurrentMap(); - private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap(); - Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet(); - - private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions; - private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions; - private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats; - private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats; - - private DistributedSet<DeviceId> availableDevices; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipService mastershipService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipTermService mastershipTermService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected DeviceClockService deviceClockService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - private NodeId localNodeId; - private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener = - new InternalDeviceChangeEventListener(); - private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener = - new InternalPortChangeEventListener(); - private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener = - new InternalPortStatsListener(); - private final SetEventListener<DeviceId> deviceStatusTracker = - new InternalDeviceStatusTracker(); - - protected static final KryoSerializer SERIALIZER = new KryoSerializer() { - @Override - protected void setupKryoPool() { - serializerPool = KryoNamespace.newBuilder() - .register(DistributedStoreSerializers.STORE_COMMON) - .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) - .register(DeviceInjectedEvent.class) - .register(PortInjectedEvent.class) - .build(); - } - }; - - protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder() - .register(KryoNamespaces.API) - .register(DeviceKey.class) - .register(PortKey.class) - .register(DeviceKey.class) - .register(PortKey.class) - .register(MastershipBasedTimestamp.class); - - @Activate - public void activate() { - localNodeId = clusterService.getLocalNode().id(); - - deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder() - .withName("onos-device-descriptions") - .withSerializer(SERIALIZER_BUILDER) - .withTimestampProvider((k, v) -> { - try { - return deviceClockService.getTimestamp(k.deviceId()); - } catch (IllegalStateException e) { - return null; - } - }).build(); - - portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder() - .withName("onos-port-descriptions") - .withSerializer(SERIALIZER_BUILDER) - .withTimestampProvider((k, v) -> { - try { - return deviceClockService.getTimestamp(k.deviceId()); - } catch (IllegalStateException e) { - return null; - } - }).build(); - - devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder() - .withName("onos-port-stats") - .withSerializer(SERIALIZER_BUILDER) - .withAntiEntropyPeriod(5, TimeUnit.SECONDS) - .withTimestampProvider((k, v) -> new WallClockTimestamp()) - .withTombstonesDisabled() - .build(); - - devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>> - eventuallyConsistentMapBuilder() - .withName("onos-port-stats-delta") - .withSerializer(SERIALIZER_BUILDER) - .withAntiEntropyPeriod(5, TimeUnit.SECONDS) - .withTimestampProvider((k, v) -> new WallClockTimestamp()) - .withTombstonesDisabled() - .build(); - - clusterCommunicator.addSubscriber(DEVICE_INJECTED, - SERIALIZER::decode, - this::injectDevice, - SERIALIZER::encode, - SharedExecutors.getPoolThreadExecutor()); - - clusterCommunicator.addSubscriber(PORT_INJECTED, - SERIALIZER::decode, - this::injectPort, - SERIALIZER::encode, - SharedExecutors.getPoolThreadExecutor()); - - availableDevices = storageService.<DeviceId>setBuilder() - .withName("onos-online-devices") - .withSerializer(Serializer.using(KryoNamespaces.API)) - .withPartitionsDisabled() - .withRelaxedReadConsistency() - .build(); - - deviceDescriptions.addListener(deviceUpdateListener); - portDescriptions.addListener(portUpdateListener); - devicePortStats.addListener(portStatsListener); - availableDevices.addListener(deviceStatusTracker); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - devicePortStats.removeListener(portStatsListener); - deviceDescriptions.removeListener(deviceUpdateListener); - portDescriptions.removeListener(portUpdateListener); - availableDevices.removeListener(deviceStatusTracker); - devicePortStats.destroy(); - devicePortDeltaStats.destroy(); - deviceDescriptions.destroy(); - portDescriptions.destroy(); - devices.clear(); - devicePorts.clear(); - clusterCommunicator.removeSubscriber(DEVICE_INJECTED); - clusterCommunicator.removeSubscriber(PORT_INJECTED); - log.info("Stopped"); - } - - @Override - public Iterable<Device> getDevices() { - return devices.values(); - } - - @Override - public int getDeviceCount() { - return devices.size(); - } - - @Override - public Device getDevice(DeviceId deviceId) { - return devices.get(deviceId); - } - - @Override - public DeviceEvent createOrUpdateDevice(ProviderId providerId, - DeviceId deviceId, - DeviceDescription deviceDescription) { - NodeId master = mastershipService.getMasterFor(deviceId); - if (localNodeId.equals(master)) { - deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription); - return refreshDeviceCache(providerId, deviceId); - } else { - DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription); - return Futures.getUnchecked( - clusterCommunicator.sendAndReceive(deviceInjectedEvent, - DEVICE_INJECTED, - SERIALIZER::encode, - SERIALIZER::decode, - master)); - } - } - - private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) { - AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>(); - Device device = devices.compute(deviceId, (k, existingDevice) -> { - Device newDevice = composeDevice(deviceId); - if (existingDevice == null) { - eventType.set(DEVICE_ADDED); - } else { - // We allow only certain attributes to trigger update - boolean propertiesChanged = - !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) || - !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) || - !Objects.equals(existingDevice.providerId(), newDevice.providerId()); - boolean annotationsChanged = - !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations()); - - // Primary providers can respond to all changes, but ancillary ones - // should respond only to annotation changes. - if ((providerId.isAncillary() && annotationsChanged) || - (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) { - boolean replaced = devices.replace(deviceId, existingDevice, newDevice); - verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", - providerId, existingDevice, devices.get(deviceId), newDevice); - eventType.set(DEVICE_UPDATED); - } - } - return newDevice; - }); - if (eventType.get() != null && !providerId.isAncillary()) { - markOnline(deviceId); - } - return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null; - } - - /** - * Returns the primary providerId for a device. - * @param deviceId device identifier - * @return primary providerId - */ - private Set<ProviderId> getAllProviders(DeviceId deviceId) { - return deviceDescriptions.keySet() - .stream() - .filter(deviceKey -> deviceKey.deviceId().equals(deviceId)) - .map(deviceKey -> deviceKey.providerId()) - .collect(Collectors.toSet()); - } - - /** - * Returns the identifier for all providers for a device. - * @param deviceId device identifier - * @return set of provider identifiers - */ - private ProviderId getPrimaryProviderId(DeviceId deviceId) { - Set<ProviderId> allProviderIds = getAllProviders(deviceId); - return allProviderIds.stream() - .filter(p -> !p.isAncillary()) - .findFirst() - .orElse(Iterables.getFirst(allProviderIds, null)); - } - - /** - * Returns a Device, merging descriptions from multiple Providers. - * - * @param deviceId device identifier - * @return Device instance - */ - private Device composeDevice(DeviceId deviceId) { - - ProviderId primaryProviderId = getPrimaryProviderId(deviceId); - DeviceDescription primaryDeviceDescription = - deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId)); - - Type type = primaryDeviceDescription.type(); - String manufacturer = primaryDeviceDescription.manufacturer(); - String hwVersion = primaryDeviceDescription.hwVersion(); - String swVersion = primaryDeviceDescription.swVersion(); - String serialNumber = primaryDeviceDescription.serialNumber(); - ChassisId chassisId = primaryDeviceDescription.chassisId(); - DefaultAnnotations annotations = mergeAnnotations(deviceId); - - return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer, - hwVersion, swVersion, serialNumber, - chassisId, annotations); - } - - private DeviceEvent purgeDeviceCache(DeviceId deviceId) { - Device removedDevice = devices.remove(deviceId); - if (removedDevice != null) { - getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId))); - return new DeviceEvent(DEVICE_REMOVED, removedDevice); - } - return null; - } - - private boolean markOnline(DeviceId deviceId) { - return availableDevices.add(deviceId); - } - - @Override - public DeviceEvent markOffline(DeviceId deviceId) { - availableDevices.remove(deviceId); - // set update listener will raise the event. - return null; - } - - @Override - public List<DeviceEvent> updatePorts(ProviderId providerId, - DeviceId deviceId, - List<PortDescription> descriptions) { - NodeId master = mastershipService.getMasterFor(deviceId); - List<DeviceEvent> deviceEvents = null; - if (localNodeId.equals(master)) { - descriptions.forEach(description -> { - PortKey portKey = new PortKey(providerId, deviceId, description.portNumber()); - portDescriptions.put(portKey, description); - }); - deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty()); - } else { - if (master == null) { - return Collections.emptyList(); - } - PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions); - deviceEvents = Futures.getUnchecked( - clusterCommunicator.sendAndReceive(portInjectedEvent, - PORT_INJECTED, - SERIALIZER::encode, - SERIALIZER::decode, - master)); - } - return deviceEvents == null ? Collections.emptyList() : deviceEvents; - } - - private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId, - DeviceId deviceId, - Optional<PortNumber> portNumber) { - Device device = devices.get(deviceId); - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); - List<DeviceEvent> events = Lists.newArrayList(); - - Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap()); - List<PortDescription> descriptions = Lists.newArrayList(); - portDescriptions.entrySet().forEach(e -> { - PortKey key = e.getKey(); - PortDescription value = e.getValue(); - if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) { - if (portNumber.isPresent()) { - if (portNumber.get().equals(key.portNumber())) { - descriptions.add(value); - } - } else { - descriptions.add(value); - } - } - }); - - for (PortDescription description : descriptions) { - final PortNumber number = description.portNumber(); - ports.compute(number, (k, existingPort) -> { - Port newPort = composePort(device, number); - if (existingPort == null) { - events.add(new DeviceEvent(PORT_ADDED, device, newPort)); - } else { - if (existingPort.isEnabled() != newPort.isEnabled() || - existingPort.type() != newPort.type() || - existingPort.portSpeed() != newPort.portSpeed() || - !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) { - events.add(new DeviceEvent(PORT_UPDATED, device, newPort)); - } - } - return newPort; - }); - } - - return events; - } - - /** - * Returns a Port, merging descriptions from multiple Providers. - * - * @param device device the port is on - * @param number port number - * @return Port instance - */ - private Port composePort(Device device, PortNumber number) { - - Map<ProviderId, PortDescription> descriptions = Maps.newHashMap(); - portDescriptions.entrySet().forEach(entry -> { - PortKey portKey = entry.getKey(); - if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) { - descriptions.put(portKey.providerId(), entry.getValue()); - } - }); - ProviderId primary = getPrimaryProviderId(device.id()); - PortDescription primaryDescription = descriptions.get(primary); - - // if no primary, assume not enabled - boolean isEnabled = false; - DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - if (primaryDescription != null) { - isEnabled = primaryDescription.isEnabled(); - annotations = merge(annotations, primaryDescription.annotations()); - } - Port updated = null; - for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) { - if (e.getKey().equals(primary)) { - continue; - } - annotations = merge(annotations, e.getValue().annotations()); - updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations); - } - if (primaryDescription == null) { - return updated == null ? new DefaultPort(device, number, false, annotations) : updated; - } - return updated == null - ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations) - : updated; - } - - private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled, - PortDescription description, Annotations annotations) { - switch (description.type()) { - case OMS: - OmsPortDescription omsDesc = (OmsPortDescription) description; - return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(), - omsDesc.maxFrequency(), omsDesc.grid(), annotations); - case OCH: - OchPortDescription ochDesc = (OchPortDescription) description; - return new OchPort(device, number, isEnabled, ochDesc.signalType(), - ochDesc.isTunable(), ochDesc.lambda(), annotations); - case ODUCLT: - OduCltPortDescription oduDesc = (OduCltPortDescription) description; - return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations); - default: - return new DefaultPort(device, number, isEnabled, description.type(), - description.portSpeed(), annotations); - } - } - - @Override - public DeviceEvent updatePortStatus(ProviderId providerId, - DeviceId deviceId, - PortDescription portDescription) { - portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription); - List<DeviceEvent> events = - refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber())); - return Iterables.getFirst(events, null); - } - - @Override - public List<Port> getPorts(DeviceId deviceId) { - return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values()); - } - - @Override - public Port getPort(DeviceId deviceId, PortNumber portNumber) { - return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber); - } - - @Override - public DeviceEvent updatePortStatistics(ProviderId providerId, - DeviceId deviceId, - Collection<PortStatistics> newStatsCollection) { - - Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId); - Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap(); - Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap(); - - if (prvStatsMap != null) { - for (PortStatistics newStats : newStatsCollection) { - PortNumber port = PortNumber.portNumber(newStats.port()); - PortStatistics prvStats = prvStatsMap.get(port); - DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); - PortStatistics deltaStats = builder.build(); - if (prvStats != null) { - deltaStats = calcDeltaStats(deviceId, prvStats, newStats); - } - deltaStatsMap.put(port, deltaStats); - newStatsMap.put(port, newStats); - } - } else { - for (PortStatistics newStats : newStatsCollection) { - PortNumber port = PortNumber.portNumber(newStats.port()); - newStatsMap.put(port, newStats); - } - } - devicePortDeltaStats.put(deviceId, deltaStatsMap); - devicePortStats.put(deviceId, newStatsMap); - // DeviceEvent returns null because of InternalPortStatsListener usage - return null; - } - - /** - * Calculate delta statistics by subtracting previous from new statistics. - * - * @param deviceId device indentifier - * @param prvStats previous port statistics - * @param newStats new port statistics - * @return PortStatistics - */ - public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) { - // calculate time difference - long deltaStatsSec, deltaStatsNano; - if (newStats.durationNano() < prvStats.durationNano()) { - deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1); - deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L; - } else { - deltaStatsNano = newStats.durationNano() - prvStats.durationNano(); - deltaStatsSec = newStats.durationSec() - prvStats.durationSec(); - } - DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder(); - DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId) - .setPort(newStats.port()) - .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived()) - .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent()) - .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived()) - .setBytesSent(newStats.bytesSent() - prvStats.bytesSent()) - .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped()) - .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped()) - .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors()) - .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors()) - .setDurationSec(deltaStatsSec) - .setDurationNano(deltaStatsNano) - .build(); - return deltaStats; - } - - @Override - public List<PortStatistics> getPortStatistics(DeviceId deviceId) { - Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId); - if (portStats == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(portStats.values()); - } - - @Override - public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) { - Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId); - if (portStats == null) { - return Collections.emptyList(); - } - return ImmutableList.copyOf(portStats.values()); - } - - @Override - public boolean isAvailable(DeviceId deviceId) { - return availableDevices.contains(deviceId); - } - - @Override - public Iterable<Device> getAvailableDevices() { - return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null); - } - - @Override - public DeviceEvent removeDevice(DeviceId deviceId) { - NodeId master = mastershipService.getMasterFor(deviceId); - // if there exist a master, forward - // if there is no master, try to become one and process - boolean relinquishAtEnd = false; - if (master == null) { - final MastershipRole myRole = mastershipService.getLocalRole(deviceId); - if (myRole != MastershipRole.NONE) { - relinquishAtEnd = true; - } - log.debug("Temporarily requesting role for {} to remove", deviceId); - MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId)); - if (role == MastershipRole.MASTER) { - master = localNodeId; - } - } - - if (!localNodeId.equals(master)) { - log.debug("{} has control of {}, forwarding remove request", - master, deviceId); - - clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master) - .whenComplete((r, e) -> { - if (e != null) { - log.error("Failed to forward {} remove request to its master", deviceId, e); - } - }); - return null; - } - - // I have control.. - DeviceEvent event = null; - final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId); - DeviceDescription removedDeviceDescription = - deviceDescriptions.remove(deviceKey); - if (removedDeviceDescription != null) { - event = purgeDeviceCache(deviceId); - } - - if (relinquishAtEnd) { - log.debug("Relinquishing temporary role acquired for {}", deviceId); - mastershipService.relinquishMastership(deviceId); - } - return event; - } - - private DeviceEvent injectDevice(DeviceInjectedEvent event) { - return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription()); - } - - private List<DeviceEvent> injectPort(PortInjectedEvent event) { - return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions()); - } - - private DefaultAnnotations mergeAnnotations(DeviceId deviceId) { - ProviderId primaryProviderId = getPrimaryProviderId(deviceId); - DeviceDescription primaryDeviceDescription = - deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId)); - DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - annotations = merge(annotations, primaryDeviceDescription.annotations()); - for (ProviderId providerId : getAllProviders(deviceId)) { - if (!providerId.equals(primaryProviderId)) { - annotations = merge(annotations, - deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations()); - } - } - return annotations; - } - - private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> { - @Override - public void event(SetEvent<DeviceId> event) { - final DeviceId deviceId = event.entry(); - final Device device = devices.get(deviceId); - if (device != null) { - notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device)); - } else { - pendingAvailableChangeUpdates.add(deviceId); - } - } - } - - private class InternalDeviceChangeEventListener - implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> { - @Override - public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) { - DeviceId deviceId = event.key().deviceId(); - ProviderId providerId = event.key().providerId(); - if (event.type() == PUT) { - notifyDelegate(refreshDeviceCache(providerId, deviceId)); - if (pendingAvailableChangeUpdates.remove(deviceId)) { - notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId))); - } - } else if (event.type() == REMOVE) { - notifyDelegate(purgeDeviceCache(deviceId)); - } - } - } - - private class InternalPortChangeEventListener - implements EventuallyConsistentMapListener<PortKey, PortDescription> { - @Override - public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) { - DeviceId deviceId = event.key().deviceId(); - ProviderId providerId = event.key().providerId(); - PortNumber portNumber = event.key().portNumber(); - if (event.type() == PUT) { - if (devices.containsKey(deviceId)) { - List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber)); - for (DeviceEvent deviceEvent : events) { - notifyDelegate(deviceEvent); - } - } - } else if (event.type() == REMOVE) { - log.warn("Unexpected port removed event"); - } - } - } - - private class InternalPortStatsListener - implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> { - @Override - public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) { - if (event.type() == PUT) { - Device device = devices.get(event.key()); - if (device != null) { - delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device)); - } - } - } - } -}
\ No newline at end of file |