aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java')
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java258
1 files changed, 258 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
new file mode 100644
index 00000000..32890cb1
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.store.meter.impl;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.meter.Band;
+import org.onosproject.net.meter.DefaultBand;
+import org.onosproject.net.meter.DefaultMeter;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterEvent;
+import org.onosproject.net.meter.MeterFailReason;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterState;
+import org.onosproject.net.meter.MeterStore;
+import org.onosproject.net.meter.MeterStoreDelegate;
+import org.onosproject.net.meter.MeterStoreResult;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A distributed meter store implementation. Meters are stored consistently
+ * across the cluster.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
+ implements MeterStore {
+
+ private Logger log = getLogger(getClass());
+
+ private static final String METERSTORE = "onos-meter-store";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ private ConsistentMap<MeterId, MeterData> meters;
+ private NodeId local;
+
+ private MapEventListener mapListener = new InternalMapEventListener();
+
+ private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
+ Maps.newConcurrentMap();
+
+ @Activate
+ public void activate() {
+
+ local = clusterService.getLocalNode().id();
+
+
+ meters = storageService.<MeterId, MeterData>consistentMapBuilder()
+ .withName(METERSTORE)
+ .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+ MeterData.class,
+ DefaultMeter.class,
+ DefaultBand.class,
+ Band.Type.class,
+ MeterState.class,
+ Meter.Unit.class,
+ MeterFailReason.class,
+ MeterId.class)).build();
+
+ meters.addListener(mapListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+
+ meters.removeListener(mapListener);
+ log.info("Stopped");
+ }
+
+
+ @Override
+ public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
+ MeterData data = new MeterData(meter, null, local);
+
+ try {
+ meters.put(meter.id(), data);
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
+ }
+
+ return future;
+
+ }
+
+ @Override
+ public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
+
+ MeterData data = new MeterData(meter, null, local);
+
+ // update the state of the meter. It will be pruned by observing
+ // that it has been removed from the dataplane.
+ try {
+ if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
+ future.complete(MeterStoreResult.success());
+ }
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
+ }
+
+
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
+
+ MeterData data = new MeterData(meter, null, local);
+ try {
+ if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
+ future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
+ }
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void updateMeterState(Meter meter) {
+ meters.computeIfPresent(meter.id(), (id, v) -> {
+ DefaultMeter m = (DefaultMeter) v.meter();
+ m.setState(meter.state());
+ m.setProcessedPackets(meter.packetsSeen());
+ m.setProcessedBytes(meter.bytesSeen());
+ m.setLife(meter.life());
+ // TODO: Prune if drops to zero.
+ m.setReferenceCount(meter.referenceCount());
+ return new MeterData(m, null, v.origin());
+ });
+ }
+
+ @Override
+ public Meter getMeter(MeterId meterId) {
+ MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
+ return data == null ? null : data.meter();
+ }
+
+ @Override
+ public Collection<Meter> getAllMeters() {
+ return Collections2.transform(meters.asJavaMap().values(),
+ MeterData::meter);
+ }
+
+ @Override
+ public void failedMeter(MeterOperation op, MeterFailReason reason) {
+ meters.computeIfPresent(op.meter().id(), (k, v) ->
+ new MeterData(v.meter(), reason, v.origin()));
+ }
+
+ @Override
+ public void deleteMeterNow(Meter m) {
+ futures.remove(m.id());
+ meters.remove(m.id());
+ }
+
+ private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
+ @Override
+ public void event(MapEvent<MeterId, MeterData> event) {
+ MeterData data = event.value().value();
+ NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
+ switch (event.type()) {
+ case INSERT:
+ case UPDATE:
+ switch (data.meter().state()) {
+ case PENDING_ADD:
+ case PENDING_REMOVE:
+ if (!data.reason().isPresent() && local.equals(master)) {
+ notifyDelegate(
+ new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
+ MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
+ data.meter()));
+ } else if (data.reason().isPresent() && local.equals(data.origin())) {
+ MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
+ //TODO: No future -> no friend
+ futures.get(data.meter().id()).complete(msr);
+ }
+ break;
+ case ADDED:
+ if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_ADD) {
+ futures.remove(data.meter().id()).complete(MeterStoreResult.success());
+ }
+ break;
+ case REMOVED:
+ if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
+ futures.remove(data.meter().id()).complete(MeterStoreResult.success());
+ }
+ break;
+ default:
+ log.warn("Unknown meter state type {}", data.meter().state());
+ }
+ break;
+ case REMOVE:
+ //Only happens at origin so we do not need to care.
+ break;
+ default:
+ log.warn("Unknown Map event type {}", event.type());
+ }
+
+ }
+ }
+
+
+}