aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java52
1 files changed, 16 insertions, 36 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 6ea7c220..3e89635a 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -18,7 +18,6 @@ package org.onosproject.store.consistent.impl;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -50,12 +49,12 @@ import org.apache.felix.scr.annotations.Service;
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationListener;
import org.onosproject.app.ApplicationService;
+import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.IdGenerator;
-import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
-import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
@@ -73,8 +72,6 @@ import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContextBuilder;
import org.slf4j.Logger;
-import java.io.File;
-import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -99,8 +96,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private final Logger log = getLogger(getClass());
- public static final int COPYCAT_TCP_PORT = 9876;
- public static final String PARTITION_DEFINITION_FILE = "../config/tablets.json";
public static final String BASE_PARTITION_NAME = "p0";
private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
@@ -122,6 +117,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
Multimaps.synchronizedMultimap(ArrayListMultimap.create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterMetadataService clusterMetadataService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
@@ -130,8 +128,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
- protected String nodeToUri(NodeInfo node) {
- return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
+ protected String nodeIdToUri(NodeId nodeId) {
+ ControllerNode node = clusterService.getNode(nodeId);
+ return String.format("onos://%s:%d", node.ip(), node.tcpPort());
}
protected void bindApplicationService(ApplicationService service) {
@@ -147,30 +146,22 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
- // load database configuration
- File databaseDefFile = new File(PARTITION_DEFINITION_FILE);
- log.info("Loading database definition: {}", databaseDefFile.getAbsolutePath());
- Map<String, Set<NodeInfo>> partitionMap;
- try {
- DatabaseDefinitionStore databaseDefStore = new DatabaseDefinitionStore(databaseDefFile);
- if (!databaseDefFile.exists()) {
- createDefaultDatabaseDefinition(databaseDefStore);
- }
- partitionMap = databaseDefStore.read().getPartitions();
- } catch (IOException e) {
- throw new IllegalStateException("Failed to load database config", e);
- }
+ Map<String, Set<NodeId>> partitionMap = Maps.newHashMap();
+ clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> {
+ partitionMap.put(p.getName(), Sets.newHashSet(p.getMembers()));
+ });
+
String[] activeNodeUris = partitionMap.values()
.stream()
.reduce((s1, s2) -> Sets.union(s1, s2))
.get()
.stream()
- .map(this::nodeToUri)
+ .map(this::nodeIdToUri)
.toArray(String[]::new);
- String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
+ String localNodeUri = nodeIdToUri(clusterMetadataService.getLocalNode().id());
Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
ClusterConfig clusterConfig = new ClusterConfig()
@@ -198,7 +189,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
List<Database> partitions = partitionMap.entrySet()
.stream()
.map(entry -> {
- String[] replicas = entry.getValue().stream().map(this::nodeToUri).toArray(String[]::new);
+ String[] replicas = entry.getValue().stream().map(this::nodeIdToUri).toArray(String[]::new);
return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas);
})
.map(config -> {
@@ -229,17 +220,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
log.info("Started");
}
- private void createDefaultDatabaseDefinition(DatabaseDefinitionStore store) {
- // Assumes IPv4 is returned.
- String ip = ClusterDefinitionManager.getSiteLocalAddress();
- NodeInfo node = NodeInfo.from(ip, ip, COPYCAT_TCP_PORT);
- try {
- store.write(DatabaseDefinition.from(ImmutableSet.of(node)));
- } catch (IOException e) {
- log.warn("Unable to write default cluster definition", e);
- }
- }
-
@Deactivate
public void deactivate() {
CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close())