summaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java784
1 files changed, 0 insertions, 784 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
deleted file mode 100644
index 2dae55bb..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.device.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Verify.verify;
-import static org.onosproject.net.DefaultAnnotations.merge;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.ChassisId;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.SharedExecutors;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.mastership.MastershipTermService;
-import org.onosproject.net.Annotations;
-import org.onosproject.net.AnnotationsUtil;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.DefaultDevice;
-import org.onosproject.net.DefaultPort;
-import org.onosproject.net.Device;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.net.OchPort;
-import org.onosproject.net.OduCltPort;
-import org.onosproject.net.OmsPort;
-import org.onosproject.net.Port;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.Device.Type;
-import org.onosproject.net.device.DefaultPortStatistics;
-import org.onosproject.net.device.DeviceClockService;
-import org.onosproject.net.device.DeviceDescription;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceStore;
-import org.onosproject.net.device.DeviceStoreDelegate;
-import org.onosproject.net.device.OchPortDescription;
-import org.onosproject.net.device.OduCltPortDescription;
-import org.onosproject.net.device.OmsPortDescription;
-import org.onosproject.net.device.PortDescription;
-import org.onosproject.net.device.PortStatistics;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.impl.MastershipBasedTimestamp;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
-import org.onosproject.store.service.DistributedSet;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.SetEvent;
-import org.onosproject.store.service.SetEventListener;
-import org.onosproject.store.service.WallClockTimestamp;
-
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
-
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-
-import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
-/**
- * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class ECDeviceStore
- extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
- implements DeviceStore {
-
- private final Logger log = getLogger(getClass());
-
- private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
-
- private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
- private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
- Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
-
- private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
- private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
- private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
- private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
-
- private DistributedSet<DeviceId> availableDevices;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipTermService mastershipTermService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceClockService deviceClockService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- private NodeId localNodeId;
- private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
- new InternalDeviceChangeEventListener();
- private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
- new InternalPortChangeEventListener();
- private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
- new InternalPortStatsListener();
- private final SetEventListener<DeviceId> deviceStatusTracker =
- new InternalDeviceStatusTracker();
-
- protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.STORE_COMMON)
- .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
- .register(DeviceInjectedEvent.class)
- .register(PortInjectedEvent.class)
- .build();
- }
- };
-
- protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(DeviceKey.class)
- .register(PortKey.class)
- .register(DeviceKey.class)
- .register(PortKey.class)
- .register(MastershipBasedTimestamp.class);
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
-
- deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
- .withName("onos-device-descriptions")
- .withSerializer(SERIALIZER_BUILDER)
- .withTimestampProvider((k, v) -> {
- try {
- return deviceClockService.getTimestamp(k.deviceId());
- } catch (IllegalStateException e) {
- return null;
- }
- }).build();
-
- portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
- .withName("onos-port-descriptions")
- .withSerializer(SERIALIZER_BUILDER)
- .withTimestampProvider((k, v) -> {
- try {
- return deviceClockService.getTimestamp(k.deviceId());
- } catch (IllegalStateException e) {
- return null;
- }
- }).build();
-
- devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
- .withName("onos-port-stats")
- .withSerializer(SERIALIZER_BUILDER)
- .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
- .withTimestampProvider((k, v) -> new WallClockTimestamp())
- .withTombstonesDisabled()
- .build();
-
- devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
- eventuallyConsistentMapBuilder()
- .withName("onos-port-stats-delta")
- .withSerializer(SERIALIZER_BUILDER)
- .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
- .withTimestampProvider((k, v) -> new WallClockTimestamp())
- .withTombstonesDisabled()
- .build();
-
- clusterCommunicator.addSubscriber(DEVICE_INJECTED,
- SERIALIZER::decode,
- this::injectDevice,
- SERIALIZER::encode,
- SharedExecutors.getPoolThreadExecutor());
-
- clusterCommunicator.addSubscriber(PORT_INJECTED,
- SERIALIZER::decode,
- this::injectPort,
- SERIALIZER::encode,
- SharedExecutors.getPoolThreadExecutor());
-
- availableDevices = storageService.<DeviceId>setBuilder()
- .withName("onos-online-devices")
- .withSerializer(Serializer.using(KryoNamespaces.API))
- .withPartitionsDisabled()
- .withRelaxedReadConsistency()
- .build();
-
- deviceDescriptions.addListener(deviceUpdateListener);
- portDescriptions.addListener(portUpdateListener);
- devicePortStats.addListener(portStatsListener);
- availableDevices.addListener(deviceStatusTracker);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- devicePortStats.removeListener(portStatsListener);
- deviceDescriptions.removeListener(deviceUpdateListener);
- portDescriptions.removeListener(portUpdateListener);
- availableDevices.removeListener(deviceStatusTracker);
- devicePortStats.destroy();
- devicePortDeltaStats.destroy();
- deviceDescriptions.destroy();
- portDescriptions.destroy();
- devices.clear();
- devicePorts.clear();
- clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
- clusterCommunicator.removeSubscriber(PORT_INJECTED);
- log.info("Stopped");
- }
-
- @Override
- public Iterable<Device> getDevices() {
- return devices.values();
- }
-
- @Override
- public int getDeviceCount() {
- return devices.size();
- }
-
- @Override
- public Device getDevice(DeviceId deviceId) {
- return devices.get(deviceId);
- }
-
- @Override
- public DeviceEvent createOrUpdateDevice(ProviderId providerId,
- DeviceId deviceId,
- DeviceDescription deviceDescription) {
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (localNodeId.equals(master)) {
- deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
- return refreshDeviceCache(providerId, deviceId);
- } else {
- DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
- return Futures.getUnchecked(
- clusterCommunicator.sendAndReceive(deviceInjectedEvent,
- DEVICE_INJECTED,
- SERIALIZER::encode,
- SERIALIZER::decode,
- master));
- }
- }
-
- private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
- AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
- Device device = devices.compute(deviceId, (k, existingDevice) -> {
- Device newDevice = composeDevice(deviceId);
- if (existingDevice == null) {
- eventType.set(DEVICE_ADDED);
- } else {
- // We allow only certain attributes to trigger update
- boolean propertiesChanged =
- !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
- !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
- !Objects.equals(existingDevice.providerId(), newDevice.providerId());
- boolean annotationsChanged =
- !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
-
- // Primary providers can respond to all changes, but ancillary ones
- // should respond only to annotation changes.
- if ((providerId.isAncillary() && annotationsChanged) ||
- (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
- boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
- verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
- providerId, existingDevice, devices.get(deviceId), newDevice);
- eventType.set(DEVICE_UPDATED);
- }
- }
- return newDevice;
- });
- if (eventType.get() != null && !providerId.isAncillary()) {
- markOnline(deviceId);
- }
- return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
- }
-
- /**
- * Returns the primary providerId for a device.
- * @param deviceId device identifier
- * @return primary providerId
- */
- private Set<ProviderId> getAllProviders(DeviceId deviceId) {
- return deviceDescriptions.keySet()
- .stream()
- .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
- .map(deviceKey -> deviceKey.providerId())
- .collect(Collectors.toSet());
- }
-
- /**
- * Returns the identifier for all providers for a device.
- * @param deviceId device identifier
- * @return set of provider identifiers
- */
- private ProviderId getPrimaryProviderId(DeviceId deviceId) {
- Set<ProviderId> allProviderIds = getAllProviders(deviceId);
- return allProviderIds.stream()
- .filter(p -> !p.isAncillary())
- .findFirst()
- .orElse(Iterables.getFirst(allProviderIds, null));
- }
-
- /**
- * Returns a Device, merging descriptions from multiple Providers.
- *
- * @param deviceId device identifier
- * @return Device instance
- */
- private Device composeDevice(DeviceId deviceId) {
-
- ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
- DeviceDescription primaryDeviceDescription =
- deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
-
- Type type = primaryDeviceDescription.type();
- String manufacturer = primaryDeviceDescription.manufacturer();
- String hwVersion = primaryDeviceDescription.hwVersion();
- String swVersion = primaryDeviceDescription.swVersion();
- String serialNumber = primaryDeviceDescription.serialNumber();
- ChassisId chassisId = primaryDeviceDescription.chassisId();
- DefaultAnnotations annotations = mergeAnnotations(deviceId);
-
- return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
- hwVersion, swVersion, serialNumber,
- chassisId, annotations);
- }
-
- private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
- Device removedDevice = devices.remove(deviceId);
- if (removedDevice != null) {
- getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
- return new DeviceEvent(DEVICE_REMOVED, removedDevice);
- }
- return null;
- }
-
- private boolean markOnline(DeviceId deviceId) {
- return availableDevices.add(deviceId);
- }
-
- @Override
- public DeviceEvent markOffline(DeviceId deviceId) {
- availableDevices.remove(deviceId);
- // set update listener will raise the event.
- return null;
- }
-
- @Override
- public List<DeviceEvent> updatePorts(ProviderId providerId,
- DeviceId deviceId,
- List<PortDescription> descriptions) {
- NodeId master = mastershipService.getMasterFor(deviceId);
- List<DeviceEvent> deviceEvents = null;
- if (localNodeId.equals(master)) {
- descriptions.forEach(description -> {
- PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
- portDescriptions.put(portKey, description);
- });
- deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
- } else {
- if (master == null) {
- return Collections.emptyList();
- }
- PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
- deviceEvents = Futures.getUnchecked(
- clusterCommunicator.sendAndReceive(portInjectedEvent,
- PORT_INJECTED,
- SERIALIZER::encode,
- SERIALIZER::decode,
- master));
- }
- return deviceEvents == null ? Collections.emptyList() : deviceEvents;
- }
-
- private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
- DeviceId deviceId,
- Optional<PortNumber> portNumber) {
- Device device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- List<DeviceEvent> events = Lists.newArrayList();
-
- Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
- List<PortDescription> descriptions = Lists.newArrayList();
- portDescriptions.entrySet().forEach(e -> {
- PortKey key = e.getKey();
- PortDescription value = e.getValue();
- if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
- if (portNumber.isPresent()) {
- if (portNumber.get().equals(key.portNumber())) {
- descriptions.add(value);
- }
- } else {
- descriptions.add(value);
- }
- }
- });
-
- for (PortDescription description : descriptions) {
- final PortNumber number = description.portNumber();
- ports.compute(number, (k, existingPort) -> {
- Port newPort = composePort(device, number);
- if (existingPort == null) {
- events.add(new DeviceEvent(PORT_ADDED, device, newPort));
- } else {
- if (existingPort.isEnabled() != newPort.isEnabled() ||
- existingPort.type() != newPort.type() ||
- existingPort.portSpeed() != newPort.portSpeed() ||
- !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
- events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
- }
- }
- return newPort;
- });
- }
-
- return events;
- }
-
- /**
- * Returns a Port, merging descriptions from multiple Providers.
- *
- * @param device device the port is on
- * @param number port number
- * @return Port instance
- */
- private Port composePort(Device device, PortNumber number) {
-
- Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
- portDescriptions.entrySet().forEach(entry -> {
- PortKey portKey = entry.getKey();
- if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
- descriptions.put(portKey.providerId(), entry.getValue());
- }
- });
- ProviderId primary = getPrimaryProviderId(device.id());
- PortDescription primaryDescription = descriptions.get(primary);
-
- // if no primary, assume not enabled
- boolean isEnabled = false;
- DefaultAnnotations annotations = DefaultAnnotations.builder().build();
- if (primaryDescription != null) {
- isEnabled = primaryDescription.isEnabled();
- annotations = merge(annotations, primaryDescription.annotations());
- }
- Port updated = null;
- for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
- if (e.getKey().equals(primary)) {
- continue;
- }
- annotations = merge(annotations, e.getValue().annotations());
- updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
- }
- if (primaryDescription == null) {
- return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
- }
- return updated == null
- ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
- : updated;
- }
-
- private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
- PortDescription description, Annotations annotations) {
- switch (description.type()) {
- case OMS:
- OmsPortDescription omsDesc = (OmsPortDescription) description;
- return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
- omsDesc.maxFrequency(), omsDesc.grid(), annotations);
- case OCH:
- OchPortDescription ochDesc = (OchPortDescription) description;
- return new OchPort(device, number, isEnabled, ochDesc.signalType(),
- ochDesc.isTunable(), ochDesc.lambda(), annotations);
- case ODUCLT:
- OduCltPortDescription oduDesc = (OduCltPortDescription) description;
- return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
- default:
- return new DefaultPort(device, number, isEnabled, description.type(),
- description.portSpeed(), annotations);
- }
- }
-
- @Override
- public DeviceEvent updatePortStatus(ProviderId providerId,
- DeviceId deviceId,
- PortDescription portDescription) {
- portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
- List<DeviceEvent> events =
- refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
- return Iterables.getFirst(events, null);
- }
-
- @Override
- public List<Port> getPorts(DeviceId deviceId) {
- return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
- }
-
- @Override
- public Port getPort(DeviceId deviceId, PortNumber portNumber) {
- return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
- }
-
- @Override
- public DeviceEvent updatePortStatistics(ProviderId providerId,
- DeviceId deviceId,
- Collection<PortStatistics> newStatsCollection) {
-
- Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
- Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
- Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
-
- if (prvStatsMap != null) {
- for (PortStatistics newStats : newStatsCollection) {
- PortNumber port = PortNumber.portNumber(newStats.port());
- PortStatistics prvStats = prvStatsMap.get(port);
- DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
- PortStatistics deltaStats = builder.build();
- if (prvStats != null) {
- deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
- }
- deltaStatsMap.put(port, deltaStats);
- newStatsMap.put(port, newStats);
- }
- } else {
- for (PortStatistics newStats : newStatsCollection) {
- PortNumber port = PortNumber.portNumber(newStats.port());
- newStatsMap.put(port, newStats);
- }
- }
- devicePortDeltaStats.put(deviceId, deltaStatsMap);
- devicePortStats.put(deviceId, newStatsMap);
- // DeviceEvent returns null because of InternalPortStatsListener usage
- return null;
- }
-
- /**
- * Calculate delta statistics by subtracting previous from new statistics.
- *
- * @param deviceId device indentifier
- * @param prvStats previous port statistics
- * @param newStats new port statistics
- * @return PortStatistics
- */
- public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
- // calculate time difference
- long deltaStatsSec, deltaStatsNano;
- if (newStats.durationNano() < prvStats.durationNano()) {
- deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
- deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
- } else {
- deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
- deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
- }
- DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
- DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
- .setPort(newStats.port())
- .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
- .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
- .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
- .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
- .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
- .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
- .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
- .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
- .setDurationSec(deltaStatsSec)
- .setDurationNano(deltaStatsNano)
- .build();
- return deltaStats;
- }
-
- @Override
- public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
- Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
- if (portStats == null) {
- return Collections.emptyList();
- }
- return ImmutableList.copyOf(portStats.values());
- }
-
- @Override
- public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
- Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
- if (portStats == null) {
- return Collections.emptyList();
- }
- return ImmutableList.copyOf(portStats.values());
- }
-
- @Override
- public boolean isAvailable(DeviceId deviceId) {
- return availableDevices.contains(deviceId);
- }
-
- @Override
- public Iterable<Device> getAvailableDevices() {
- return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
- }
-
- @Override
- public DeviceEvent removeDevice(DeviceId deviceId) {
- NodeId master = mastershipService.getMasterFor(deviceId);
- // if there exist a master, forward
- // if there is no master, try to become one and process
- boolean relinquishAtEnd = false;
- if (master == null) {
- final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
- if (myRole != MastershipRole.NONE) {
- relinquishAtEnd = true;
- }
- log.debug("Temporarily requesting role for {} to remove", deviceId);
- MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
- if (role == MastershipRole.MASTER) {
- master = localNodeId;
- }
- }
-
- if (!localNodeId.equals(master)) {
- log.debug("{} has control of {}, forwarding remove request",
- master, deviceId);
-
- clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
- .whenComplete((r, e) -> {
- if (e != null) {
- log.error("Failed to forward {} remove request to its master", deviceId, e);
- }
- });
- return null;
- }
-
- // I have control..
- DeviceEvent event = null;
- final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
- DeviceDescription removedDeviceDescription =
- deviceDescriptions.remove(deviceKey);
- if (removedDeviceDescription != null) {
- event = purgeDeviceCache(deviceId);
- }
-
- if (relinquishAtEnd) {
- log.debug("Relinquishing temporary role acquired for {}", deviceId);
- mastershipService.relinquishMastership(deviceId);
- }
- return event;
- }
-
- private DeviceEvent injectDevice(DeviceInjectedEvent event) {
- return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
- }
-
- private List<DeviceEvent> injectPort(PortInjectedEvent event) {
- return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
- }
-
- private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
- ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
- DeviceDescription primaryDeviceDescription =
- deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
- DefaultAnnotations annotations = DefaultAnnotations.builder().build();
- annotations = merge(annotations, primaryDeviceDescription.annotations());
- for (ProviderId providerId : getAllProviders(deviceId)) {
- if (!providerId.equals(primaryProviderId)) {
- annotations = merge(annotations,
- deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
- }
- }
- return annotations;
- }
-
- private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
- @Override
- public void event(SetEvent<DeviceId> event) {
- final DeviceId deviceId = event.entry();
- final Device device = devices.get(deviceId);
- if (device != null) {
- notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
- } else {
- pendingAvailableChangeUpdates.add(deviceId);
- }
- }
- }
-
- private class InternalDeviceChangeEventListener
- implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
- @Override
- public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
- DeviceId deviceId = event.key().deviceId();
- ProviderId providerId = event.key().providerId();
- if (event.type() == PUT) {
- notifyDelegate(refreshDeviceCache(providerId, deviceId));
- if (pendingAvailableChangeUpdates.remove(deviceId)) {
- notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
- }
- } else if (event.type() == REMOVE) {
- notifyDelegate(purgeDeviceCache(deviceId));
- }
- }
- }
-
- private class InternalPortChangeEventListener
- implements EventuallyConsistentMapListener<PortKey, PortDescription> {
- @Override
- public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
- DeviceId deviceId = event.key().deviceId();
- ProviderId providerId = event.key().providerId();
- PortNumber portNumber = event.key().portNumber();
- if (event.type() == PUT) {
- if (devices.containsKey(deviceId)) {
- List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
- for (DeviceEvent deviceEvent : events) {
- notifyDelegate(deviceEvent);
- }
- }
- } else if (event.type() == REMOVE) {
- log.warn("Unexpected port removed event");
- }
- }
- }
-
- private class InternalPortStatsListener
- implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
- @Override
- public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
- if (event.type() == PUT) {
- Device device = devices.get(event.key());
- if (device != null) {
- delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
- }
- }
- }
- }
-} \ No newline at end of file