aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java62
1 files changed, 11 insertions, 51 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 97333ebf..a999ee7f 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -28,19 +28,11 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.instructions.Instructions;
-import org.onosproject.net.flow.instructions.L0ModificationInstruction;
-import org.onosproject.net.flow.instructions.L2ModificationInstruction;
-import org.onosproject.net.flow.instructions.L3ModificationInstruction;
import org.onosproject.net.group.DefaultGroup;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
@@ -61,9 +53,7 @@ import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.MultiValuedTimestamp;
-import org.onosproject.store.serializers.DeviceIdSerializer;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.URISerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
@@ -71,7 +61,6 @@ import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -139,9 +128,12 @@ public class DistributedGroupStore
private final AtomicLong sequenceNumber = new AtomicLong(0);
+ private KryoNamespace clusterMsgSerializer;
+
@Activate
public void activate() {
kryoBuilder = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
.register(DefaultGroup.class,
DefaultGroupBucket.class,
DefaultGroupDescription.class,
@@ -158,38 +150,9 @@ public class DistributedGroupStore
GroupStoreKeyMapKey.class,
GroupStoreIdMapKey.class,
GroupStoreMapKey.class
- )
- .register(new URISerializer(), URI.class)
- .register(new DeviceIdSerializer(), DeviceId.class)
- .register(PortNumber.class)
- .register(DefaultApplicationId.class)
- .register(DefaultTrafficTreatment.class,
- Instructions.DropInstruction.class,
- Instructions.OutputInstruction.class,
- Instructions.GroupInstruction.class,
- Instructions.TableTypeTransition.class,
- FlowRule.Type.class,
- L0ModificationInstruction.class,
- L0ModificationInstruction.L0SubType.class,
- L0ModificationInstruction.ModLambdaInstruction.class,
- L2ModificationInstruction.class,
- L2ModificationInstruction.L2SubType.class,
- L2ModificationInstruction.ModEtherInstruction.class,
- L2ModificationInstruction.PushHeaderInstructions.class,
- L2ModificationInstruction.ModVlanIdInstruction.class,
- L2ModificationInstruction.ModVlanPcpInstruction.class,
- L2ModificationInstruction.ModMplsLabelInstruction.class,
- L2ModificationInstruction.ModMplsTtlInstruction.class,
- L3ModificationInstruction.class,
- L3ModificationInstruction.L3SubType.class,
- L3ModificationInstruction.ModIPInstruction.class,
- L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
- L3ModificationInstruction.ModTtlInstruction.class,
- org.onlab.packet.MplsLabel.class
- )
- .register(org.onosproject.cluster.NodeId.class)
- .register(KryoNamespaces.BASIC)
- .register(KryoNamespaces.MISC);
+ );
+
+ clusterMsgSerializer = kryoBuilder.build();
messageHandlingExecutor = Executors.
newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
@@ -197,7 +160,7 @@ public class DistributedGroupStore
"message-handlers"));
clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- kryoBuilder.build()::deserialize,
+ clusterMsgSerializer::deserialize,
this::process,
messageHandlingExecutor);
@@ -233,6 +196,7 @@ public class DistributedGroupStore
@Deactivate
public void deactivate() {
+ clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
groupStoreEntriesByKey.destroy();
auditPendingReqQueue.destroy();
log.info("Stopped");
@@ -313,8 +277,6 @@ public class DistributedGroupStore
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
- log.debug("getGroups: for device {} total number of groups {}",
- deviceId, getGroupStoreKeyMap().values().size());
return FluentIterable.from(getGroupStoreKeyMap().values())
.filter(input -> input.deviceId().equals(deviceId))
.transform(input -> input);
@@ -322,8 +284,6 @@ public class DistributedGroupStore
private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
- log.debug("getGroups: for device {} total number of groups {}",
- deviceId, getGroupStoreKeyMap().values().size());
return FluentIterable.from(getGroupStoreKeyMap().values())
.filter(input -> input.deviceId().equals(deviceId));
}
@@ -411,7 +371,7 @@ public class DistributedGroupStore
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- m -> kryoBuilder.build().serialize(m),
+ clusterMsgSerializer::serialize,
mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
@@ -609,7 +569,7 @@ public class DistributedGroupStore
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- m -> kryoBuilder.build().serialize(m),
+ clusterMsgSerializer::serialize,
mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
@@ -741,7 +701,7 @@ public class DistributedGroupStore
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- m -> kryoBuilder.build().serialize(m),
+ clusterMsgSerializer::serialize,
mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",