/* * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onosproject.store.consistent.impl; import java.util.List; import java.util.Map; import java.util.Set; import org.onlab.util.HexString; import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.DatabaseUpdate; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.TransactionContext; import org.onosproject.store.service.TransactionalMap; import org.onosproject.store.service.Versioned; import static com.google.common.base.Preconditions.*; import com.google.common.base.Objects; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** * Default Transactional Map implementation that provides a repeatable reads * transaction isolation level. * * @param key type * @param value type. */ public class DefaultTransactionalMap implements TransactionalMap { private final TransactionContext txContext; private static final String TX_CLOSED_ERROR = "Transaction is closed"; private final ConsistentMap backingMap; private final String name; private final Serializer serializer; private final Map> readCache = Maps.newConcurrentMap(); private final Map writeCache = Maps.newConcurrentMap(); private final Set deleteSet = Sets.newConcurrentHashSet(); private static final String ERROR_NULL_VALUE = "Null values are not allowed"; private static final String ERROR_NULL_KEY = "Null key is not allowed"; private final LoadingCache keyCache = CacheBuilder.newBuilder() .softValues() .build(new CacheLoader() { @Override public String load(K key) { return HexString.toHexString(serializer.encode(key)); } }); protected K dK(String key) { return serializer.decode(HexString.fromHexString(key)); } public DefaultTransactionalMap( String name, ConsistentMap backingMap, TransactionContext txContext, Serializer serializer) { this.name = name; this.backingMap = backingMap; this.txContext = txContext; this.serializer = serializer; } @Override public V get(K key) { checkState(txContext.isOpen(), TX_CLOSED_ERROR); checkNotNull(key, ERROR_NULL_KEY); if (deleteSet.contains(key)) { return null; } V latest = writeCache.get(key); if (latest != null) { return latest; } else { Versioned v = readCache.computeIfAbsent(key, k -> backingMap.get(k)); return v != null ? v.value() : null; } } @Override public V put(K key, V value) { checkState(txContext.isOpen(), TX_CLOSED_ERROR); checkNotNull(value, ERROR_NULL_VALUE); V latest = get(key); writeCache.put(key, value); deleteSet.remove(key); return latest; } @Override public V remove(K key) { checkState(txContext.isOpen(), TX_CLOSED_ERROR); V latest = get(key); if (latest != null) { writeCache.remove(key); deleteSet.add(key); } return latest; } @Override public boolean remove(K key, V value) { checkState(txContext.isOpen(), TX_CLOSED_ERROR); checkNotNull(value, ERROR_NULL_VALUE); V latest = get(key); if (Objects.equal(value, latest)) { remove(key); return true; } return false; } @Override public boolean replace(K key, V oldValue, V newValue) { checkState(txContext.isOpen(), TX_CLOSED_ERROR); checkNotNull(oldValue, ERROR_NULL_VALUE); checkNotNull(newValue, ERROR_NULL_VALUE); V latest = get(key); if (Objects.equal(oldValue, latest)) { put(key, newValue); return true; } return false; } @Override public V putIfAbsent(K key, V value) { checkState(txContext.isOpen(), TX_CLOSED_ERROR); checkNotNull(value, ERROR_NULL_VALUE); V latest = get(key); if (latest == null) { put(key, value); } return latest; } protected List prepareDatabaseUpdates() { List updates = Lists.newLinkedList(); deleteSet.forEach(key -> { Versioned original = readCache.get(key); if (original != null) { updates.add(DatabaseUpdate.newBuilder() .withMapName(name) .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH) .withKey(keyCache.getUnchecked(key)) .withCurrentVersion(original.version()) .build()); } }); writeCache.forEach((key, value) -> { Versioned original = readCache.get(key); if (original == null) { updates.add(DatabaseUpdate.newBuilder() .withMapName(name) .withType(DatabaseUpdate.Type.PUT_IF_ABSENT) .withKey(keyCache.getUnchecked(key)) .withValue(serializer.encode(value)) .build()); } else { updates.add(DatabaseUpdate.newBuilder() .withMapName(name) .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH) .withKey(keyCache.getUnchecked(key)) .withCurrentVersion(original.version()) .withValue(serializer.encode(value)) .build()); } }); return updates; } /** * Discards all changes made to this transactional map. */ protected void rollback() { readCache.clear(); writeCache.clear(); deleteSet.clear(); } }