aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java784
1 files changed, 784 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
new file mode 100644
index 00000000..2dae55bb
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
@@ -0,0 +1,784 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.device.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verify;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.ChassisId;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mastership.MastershipTermService;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.AnnotationsUtil;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultPort;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.OchPort;
+import org.onosproject.net.OduCltPort;
+import org.onosproject.net.OmsPort;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.Device.Type;
+import org.onosproject.net.device.DefaultPortStatistics;
+import org.onosproject.net.device.DeviceClockService;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceStore;
+import org.onosproject.net.device.DeviceStoreDelegate;
+import org.onosproject.net.device.OchPortDescription;
+import org.onosproject.net.device.OduCltPortDescription;
+import org.onosproject.net.device.OmsPortDescription;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
+import org.onosproject.store.service.WallClockTimestamp;
+
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class ECDeviceStore
+ extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
+ implements DeviceStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+
+ private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
+ private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
+ Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
+
+ private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
+ private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
+ private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
+ private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
+
+ private DistributedSet<DeviceId> availableDevices;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipTermService mastershipTermService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceClockService deviceClockService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private NodeId localNodeId;
+ private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
+ new InternalDeviceChangeEventListener();
+ private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
+ new InternalPortChangeEventListener();
+ private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
+ new InternalPortStatsListener();
+ private final SetEventListener<DeviceId> deviceStatusTracker =
+ new InternalDeviceStatusTracker();
+
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+ .register(DeviceInjectedEvent.class)
+ .register(PortInjectedEvent.class)
+ .build();
+ }
+ };
+
+ protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(DeviceKey.class)
+ .register(PortKey.class)
+ .register(DeviceKey.class)
+ .register(PortKey.class)
+ .register(MastershipBasedTimestamp.class);
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+
+ deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
+ .withName("onos-device-descriptions")
+ .withSerializer(SERIALIZER_BUILDER)
+ .withTimestampProvider((k, v) -> {
+ try {
+ return deviceClockService.getTimestamp(k.deviceId());
+ } catch (IllegalStateException e) {
+ return null;
+ }
+ }).build();
+
+ portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
+ .withName("onos-port-descriptions")
+ .withSerializer(SERIALIZER_BUILDER)
+ .withTimestampProvider((k, v) -> {
+ try {
+ return deviceClockService.getTimestamp(k.deviceId());
+ } catch (IllegalStateException e) {
+ return null;
+ }
+ }).build();
+
+ devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
+ .withName("onos-port-stats")
+ .withSerializer(SERIALIZER_BUILDER)
+ .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
+ eventuallyConsistentMapBuilder()
+ .withName("onos-port-stats-delta")
+ .withSerializer(SERIALIZER_BUILDER)
+ .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ clusterCommunicator.addSubscriber(DEVICE_INJECTED,
+ SERIALIZER::decode,
+ this::injectDevice,
+ SERIALIZER::encode,
+ SharedExecutors.getPoolThreadExecutor());
+
+ clusterCommunicator.addSubscriber(PORT_INJECTED,
+ SERIALIZER::decode,
+ this::injectPort,
+ SERIALIZER::encode,
+ SharedExecutors.getPoolThreadExecutor());
+
+ availableDevices = storageService.<DeviceId>setBuilder()
+ .withName("onos-online-devices")
+ .withSerializer(Serializer.using(KryoNamespaces.API))
+ .withPartitionsDisabled()
+ .withRelaxedReadConsistency()
+ .build();
+
+ deviceDescriptions.addListener(deviceUpdateListener);
+ portDescriptions.addListener(portUpdateListener);
+ devicePortStats.addListener(portStatsListener);
+ availableDevices.addListener(deviceStatusTracker);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ devicePortStats.removeListener(portStatsListener);
+ deviceDescriptions.removeListener(deviceUpdateListener);
+ portDescriptions.removeListener(portUpdateListener);
+ availableDevices.removeListener(deviceStatusTracker);
+ devicePortStats.destroy();
+ devicePortDeltaStats.destroy();
+ deviceDescriptions.destroy();
+ portDescriptions.destroy();
+ devices.clear();
+ devicePorts.clear();
+ clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
+ clusterCommunicator.removeSubscriber(PORT_INJECTED);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Iterable<Device> getDevices() {
+ return devices.values();
+ }
+
+ @Override
+ public int getDeviceCount() {
+ return devices.size();
+ }
+
+ @Override
+ public Device getDevice(DeviceId deviceId) {
+ return devices.get(deviceId);
+ }
+
+ @Override
+ public DeviceEvent createOrUpdateDevice(ProviderId providerId,
+ DeviceId deviceId,
+ DeviceDescription deviceDescription) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (localNodeId.equals(master)) {
+ deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
+ return refreshDeviceCache(providerId, deviceId);
+ } else {
+ DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
+ return Futures.getUnchecked(
+ clusterCommunicator.sendAndReceive(deviceInjectedEvent,
+ DEVICE_INJECTED,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master));
+ }
+ }
+
+ private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
+ AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
+ Device device = devices.compute(deviceId, (k, existingDevice) -> {
+ Device newDevice = composeDevice(deviceId);
+ if (existingDevice == null) {
+ eventType.set(DEVICE_ADDED);
+ } else {
+ // We allow only certain attributes to trigger update
+ boolean propertiesChanged =
+ !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
+ !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
+ !Objects.equals(existingDevice.providerId(), newDevice.providerId());
+ boolean annotationsChanged =
+ !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
+
+ // Primary providers can respond to all changes, but ancillary ones
+ // should respond only to annotation changes.
+ if ((providerId.isAncillary() && annotationsChanged) ||
+ (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
+ boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
+ verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+ providerId, existingDevice, devices.get(deviceId), newDevice);
+ eventType.set(DEVICE_UPDATED);
+ }
+ }
+ return newDevice;
+ });
+ if (eventType.get() != null && !providerId.isAncillary()) {
+ markOnline(deviceId);
+ }
+ return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
+ }
+
+ /**
+ * Returns the primary providerId for a device.
+ * @param deviceId device identifier
+ * @return primary providerId
+ */
+ private Set<ProviderId> getAllProviders(DeviceId deviceId) {
+ return deviceDescriptions.keySet()
+ .stream()
+ .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
+ .map(deviceKey -> deviceKey.providerId())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Returns the identifier for all providers for a device.
+ * @param deviceId device identifier
+ * @return set of provider identifiers
+ */
+ private ProviderId getPrimaryProviderId(DeviceId deviceId) {
+ Set<ProviderId> allProviderIds = getAllProviders(deviceId);
+ return allProviderIds.stream()
+ .filter(p -> !p.isAncillary())
+ .findFirst()
+ .orElse(Iterables.getFirst(allProviderIds, null));
+ }
+
+ /**
+ * Returns a Device, merging descriptions from multiple Providers.
+ *
+ * @param deviceId device identifier
+ * @return Device instance
+ */
+ private Device composeDevice(DeviceId deviceId) {
+
+ ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
+ DeviceDescription primaryDeviceDescription =
+ deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
+
+ Type type = primaryDeviceDescription.type();
+ String manufacturer = primaryDeviceDescription.manufacturer();
+ String hwVersion = primaryDeviceDescription.hwVersion();
+ String swVersion = primaryDeviceDescription.swVersion();
+ String serialNumber = primaryDeviceDescription.serialNumber();
+ ChassisId chassisId = primaryDeviceDescription.chassisId();
+ DefaultAnnotations annotations = mergeAnnotations(deviceId);
+
+ return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
+ hwVersion, swVersion, serialNumber,
+ chassisId, annotations);
+ }
+
+ private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
+ Device removedDevice = devices.remove(deviceId);
+ if (removedDevice != null) {
+ getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
+ return new DeviceEvent(DEVICE_REMOVED, removedDevice);
+ }
+ return null;
+ }
+
+ private boolean markOnline(DeviceId deviceId) {
+ return availableDevices.add(deviceId);
+ }
+
+ @Override
+ public DeviceEvent markOffline(DeviceId deviceId) {
+ availableDevices.remove(deviceId);
+ // set update listener will raise the event.
+ return null;
+ }
+
+ @Override
+ public List<DeviceEvent> updatePorts(ProviderId providerId,
+ DeviceId deviceId,
+ List<PortDescription> descriptions) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ List<DeviceEvent> deviceEvents = null;
+ if (localNodeId.equals(master)) {
+ descriptions.forEach(description -> {
+ PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
+ portDescriptions.put(portKey, description);
+ });
+ deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
+ } else {
+ if (master == null) {
+ return Collections.emptyList();
+ }
+ PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
+ deviceEvents = Futures.getUnchecked(
+ clusterCommunicator.sendAndReceive(portInjectedEvent,
+ PORT_INJECTED,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master));
+ }
+ return deviceEvents == null ? Collections.emptyList() : deviceEvents;
+ }
+
+ private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
+ DeviceId deviceId,
+ Optional<PortNumber> portNumber) {
+ Device device = devices.get(deviceId);
+ checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+ List<DeviceEvent> events = Lists.newArrayList();
+
+ Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
+ List<PortDescription> descriptions = Lists.newArrayList();
+ portDescriptions.entrySet().forEach(e -> {
+ PortKey key = e.getKey();
+ PortDescription value = e.getValue();
+ if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
+ if (portNumber.isPresent()) {
+ if (portNumber.get().equals(key.portNumber())) {
+ descriptions.add(value);
+ }
+ } else {
+ descriptions.add(value);
+ }
+ }
+ });
+
+ for (PortDescription description : descriptions) {
+ final PortNumber number = description.portNumber();
+ ports.compute(number, (k, existingPort) -> {
+ Port newPort = composePort(device, number);
+ if (existingPort == null) {
+ events.add(new DeviceEvent(PORT_ADDED, device, newPort));
+ } else {
+ if (existingPort.isEnabled() != newPort.isEnabled() ||
+ existingPort.type() != newPort.type() ||
+ existingPort.portSpeed() != newPort.portSpeed() ||
+ !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
+ events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
+ }
+ }
+ return newPort;
+ });
+ }
+
+ return events;
+ }
+
+ /**
+ * Returns a Port, merging descriptions from multiple Providers.
+ *
+ * @param device device the port is on
+ * @param number port number
+ * @return Port instance
+ */
+ private Port composePort(Device device, PortNumber number) {
+
+ Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
+ portDescriptions.entrySet().forEach(entry -> {
+ PortKey portKey = entry.getKey();
+ if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
+ descriptions.put(portKey.providerId(), entry.getValue());
+ }
+ });
+ ProviderId primary = getPrimaryProviderId(device.id());
+ PortDescription primaryDescription = descriptions.get(primary);
+
+ // if no primary, assume not enabled
+ boolean isEnabled = false;
+ DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+ if (primaryDescription != null) {
+ isEnabled = primaryDescription.isEnabled();
+ annotations = merge(annotations, primaryDescription.annotations());
+ }
+ Port updated = null;
+ for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
+ if (e.getKey().equals(primary)) {
+ continue;
+ }
+ annotations = merge(annotations, e.getValue().annotations());
+ updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
+ }
+ if (primaryDescription == null) {
+ return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
+ }
+ return updated == null
+ ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
+ : updated;
+ }
+
+ private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
+ PortDescription description, Annotations annotations) {
+ switch (description.type()) {
+ case OMS:
+ OmsPortDescription omsDesc = (OmsPortDescription) description;
+ return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
+ omsDesc.maxFrequency(), omsDesc.grid(), annotations);
+ case OCH:
+ OchPortDescription ochDesc = (OchPortDescription) description;
+ return new OchPort(device, number, isEnabled, ochDesc.signalType(),
+ ochDesc.isTunable(), ochDesc.lambda(), annotations);
+ case ODUCLT:
+ OduCltPortDescription oduDesc = (OduCltPortDescription) description;
+ return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
+ default:
+ return new DefaultPort(device, number, isEnabled, description.type(),
+ description.portSpeed(), annotations);
+ }
+ }
+
+ @Override
+ public DeviceEvent updatePortStatus(ProviderId providerId,
+ DeviceId deviceId,
+ PortDescription portDescription) {
+ portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
+ List<DeviceEvent> events =
+ refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
+ return Iterables.getFirst(events, null);
+ }
+
+ @Override
+ public List<Port> getPorts(DeviceId deviceId) {
+ return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
+ }
+
+ @Override
+ public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+ return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
+ }
+
+ @Override
+ public DeviceEvent updatePortStatistics(ProviderId providerId,
+ DeviceId deviceId,
+ Collection<PortStatistics> newStatsCollection) {
+
+ Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
+ Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
+ Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
+
+ if (prvStatsMap != null) {
+ for (PortStatistics newStats : newStatsCollection) {
+ PortNumber port = PortNumber.portNumber(newStats.port());
+ PortStatistics prvStats = prvStatsMap.get(port);
+ DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
+ PortStatistics deltaStats = builder.build();
+ if (prvStats != null) {
+ deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
+ }
+ deltaStatsMap.put(port, deltaStats);
+ newStatsMap.put(port, newStats);
+ }
+ } else {
+ for (PortStatistics newStats : newStatsCollection) {
+ PortNumber port = PortNumber.portNumber(newStats.port());
+ newStatsMap.put(port, newStats);
+ }
+ }
+ devicePortDeltaStats.put(deviceId, deltaStatsMap);
+ devicePortStats.put(deviceId, newStatsMap);
+ // DeviceEvent returns null because of InternalPortStatsListener usage
+ return null;
+ }
+
+ /**
+ * Calculate delta statistics by subtracting previous from new statistics.
+ *
+ * @param deviceId device indentifier
+ * @param prvStats previous port statistics
+ * @param newStats new port statistics
+ * @return PortStatistics
+ */
+ public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
+ // calculate time difference
+ long deltaStatsSec, deltaStatsNano;
+ if (newStats.durationNano() < prvStats.durationNano()) {
+ deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
+ deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
+ } else {
+ deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
+ deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
+ }
+ DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
+ DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
+ .setPort(newStats.port())
+ .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
+ .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
+ .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
+ .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
+ .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
+ .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
+ .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
+ .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
+ .setDurationSec(deltaStatsSec)
+ .setDurationNano(deltaStatsNano)
+ .build();
+ return deltaStats;
+ }
+
+ @Override
+ public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
+ Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
+ if (portStats == null) {
+ return Collections.emptyList();
+ }
+ return ImmutableList.copyOf(portStats.values());
+ }
+
+ @Override
+ public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
+ Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
+ if (portStats == null) {
+ return Collections.emptyList();
+ }
+ return ImmutableList.copyOf(portStats.values());
+ }
+
+ @Override
+ public boolean isAvailable(DeviceId deviceId) {
+ return availableDevices.contains(deviceId);
+ }
+
+ @Override
+ public Iterable<Device> getAvailableDevices() {
+ return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
+ }
+
+ @Override
+ public DeviceEvent removeDevice(DeviceId deviceId) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ // if there exist a master, forward
+ // if there is no master, try to become one and process
+ boolean relinquishAtEnd = false;
+ if (master == null) {
+ final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
+ if (myRole != MastershipRole.NONE) {
+ relinquishAtEnd = true;
+ }
+ log.debug("Temporarily requesting role for {} to remove", deviceId);
+ MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
+ if (role == MastershipRole.MASTER) {
+ master = localNodeId;
+ }
+ }
+
+ if (!localNodeId.equals(master)) {
+ log.debug("{} has control of {}, forwarding remove request",
+ master, deviceId);
+
+ clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ log.error("Failed to forward {} remove request to its master", deviceId, e);
+ }
+ });
+ return null;
+ }
+
+ // I have control..
+ DeviceEvent event = null;
+ final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
+ DeviceDescription removedDeviceDescription =
+ deviceDescriptions.remove(deviceKey);
+ if (removedDeviceDescription != null) {
+ event = purgeDeviceCache(deviceId);
+ }
+
+ if (relinquishAtEnd) {
+ log.debug("Relinquishing temporary role acquired for {}", deviceId);
+ mastershipService.relinquishMastership(deviceId);
+ }
+ return event;
+ }
+
+ private DeviceEvent injectDevice(DeviceInjectedEvent event) {
+ return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
+ }
+
+ private List<DeviceEvent> injectPort(PortInjectedEvent event) {
+ return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
+ }
+
+ private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
+ ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
+ DeviceDescription primaryDeviceDescription =
+ deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
+ DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+ annotations = merge(annotations, primaryDeviceDescription.annotations());
+ for (ProviderId providerId : getAllProviders(deviceId)) {
+ if (!providerId.equals(primaryProviderId)) {
+ annotations = merge(annotations,
+ deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
+ }
+ }
+ return annotations;
+ }
+
+ private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
+ @Override
+ public void event(SetEvent<DeviceId> event) {
+ final DeviceId deviceId = event.entry();
+ final Device device = devices.get(deviceId);
+ if (device != null) {
+ notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
+ } else {
+ pendingAvailableChangeUpdates.add(deviceId);
+ }
+ }
+ }
+
+ private class InternalDeviceChangeEventListener
+ implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
+ DeviceId deviceId = event.key().deviceId();
+ ProviderId providerId = event.key().providerId();
+ if (event.type() == PUT) {
+ notifyDelegate(refreshDeviceCache(providerId, deviceId));
+ if (pendingAvailableChangeUpdates.remove(deviceId)) {
+ notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
+ }
+ } else if (event.type() == REMOVE) {
+ notifyDelegate(purgeDeviceCache(deviceId));
+ }
+ }
+ }
+
+ private class InternalPortChangeEventListener
+ implements EventuallyConsistentMapListener<PortKey, PortDescription> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
+ DeviceId deviceId = event.key().deviceId();
+ ProviderId providerId = event.key().providerId();
+ PortNumber portNumber = event.key().portNumber();
+ if (event.type() == PUT) {
+ if (devices.containsKey(deviceId)) {
+ List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
+ for (DeviceEvent deviceEvent : events) {
+ notifyDelegate(deviceEvent);
+ }
+ }
+ } else if (event.type() == REMOVE) {
+ log.warn("Unexpected port removed event");
+ }
+ }
+ }
+
+ private class InternalPortStatsListener
+ implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
+ if (event.type() == PUT) {
+ Device device = devices.get(event.key());
+ if (device != null) {
+ delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
+ }
+ }
+ }
+ }
+} \ No newline at end of file