diff options
Diffstat (limited to 'framework/src/onos/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java')
-rw-r--r-- | framework/src/onos/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java | 194 |
1 files changed, 148 insertions, 46 deletions
diff --git a/framework/src/onos/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java b/framework/src/onos/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java index cb8acab2..ba707800 100644 --- a/framework/src/onos/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java +++ b/framework/src/onos/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java @@ -15,6 +15,8 @@ */ package org.onosproject.cordvtn; +import com.google.common.collect.Collections2; +import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -22,31 +24,39 @@ import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onlab.util.KryoNamespace; +import org.onosproject.cluster.ClusterService; +import org.onosproject.core.ApplicationId; import org.onosproject.core.CoreService; +import org.onosproject.mastership.MastershipService; import org.onosproject.net.Device; import org.onosproject.net.DeviceId; import org.onosproject.net.Host; +import org.onosproject.net.behaviour.ControllerInfo; import org.onosproject.net.device.DeviceEvent; import org.onosproject.net.device.DeviceListener; import org.onosproject.net.device.DeviceService; import org.onosproject.net.host.HostEvent; import org.onosproject.net.host.HostListener; import org.onosproject.net.host.HostService; +import org.onosproject.ovsdb.controller.OvsdbClientService; +import org.onosproject.ovsdb.controller.OvsdbController; +import org.onosproject.ovsdb.controller.OvsdbNodeId; import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.LogicalClockService; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkNotNull; import static org.onlab.util.Tools.groupedThreads; -import static org.onosproject.cordvtn.OvsdbNode.State; -import static org.onosproject.cordvtn.OvsdbNode.State.INIT; -import static org.onosproject.cordvtn.OvsdbNode.State.DISCONNECT; import static org.onosproject.net.Device.Type.SWITCH; import static org.slf4j.LoggerFactory.getLogger; @@ -63,7 +73,17 @@ public class CordVtn implements CordVtnService { private static final int NUM_THREADS = 1; private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder() .register(KryoNamespaces.API) - .register(OvsdbNode.class); + .register(DefaultOvsdbNode.class); + private static final String DEFAULT_BRIDGE_NAME = "br-int"; + private static final Map<String, String> VXLAN_OPTIONS = new HashMap<String, String>() { + { + put("key", "flow"); + put("local_ip", "flow"); + put("remote_ip", "flow"); + } + }; + private static final int DPID_BEGIN = 3; + private static final int OFPORT = 6653; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected CoreService coreService; @@ -72,14 +92,20 @@ public class CordVtn implements CordVtnService { protected StorageService storageService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected LogicalClockService clockService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DeviceService deviceService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected HostService hostService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected OvsdbController controller; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + private final ExecutorService eventExecutor = Executors .newFixedThreadPool(NUM_THREADS, groupedThreads("onos/cordvtn", "event-handler")); @@ -90,15 +116,16 @@ public class CordVtn implements CordVtnService { private final BridgeHandler bridgeHandler = new BridgeHandler(); private final VmHandler vmHandler = new VmHandler(); - private EventuallyConsistentMap<DeviceId, OvsdbNode> nodeStore; + private ConsistentMap<DeviceId, OvsdbNode> nodeStore; + private ApplicationId appId; @Activate protected void activate() { - coreService.registerApplication("org.onosproject.cordvtn"); - nodeStore = storageService.<DeviceId, OvsdbNode>eventuallyConsistentMapBuilder() + appId = coreService.registerApplication("org.onosproject.cordvtn"); + nodeStore = storageService.<DeviceId, OvsdbNode>consistentMapBuilder() + .withSerializer(Serializer.using(NODE_SERIALIZER.build())) .withName("cordvtn-nodestore") - .withSerializer(NODE_SERIALIZER) - .withTimestampProvider((k, v) -> clockService.getTimestamp()) + .withApplicationId(appId) .build(); deviceService.addListener(deviceListener); @@ -113,43 +140,59 @@ public class CordVtn implements CordVtnService { hostService.removeListener(hostListener); eventExecutor.shutdown(); - nodeStore.destroy(); + nodeStore.clear(); log.info("Stopped"); } @Override - public void addNode(OvsdbNode ovsdbNode) { - if (nodeStore.containsKey(ovsdbNode.deviceId())) { - log.warn("Node {} already exists", ovsdbNode.host()); + public void addNode(OvsdbNode ovsdb) { + checkNotNull(ovsdb); + nodeStore.put(ovsdb.deviceId(), ovsdb); + } + + @Override + public void deleteNode(OvsdbNode ovsdb) { + checkNotNull(ovsdb); + + if (!nodeStore.containsKey(ovsdb.deviceId())) { return; } - nodeStore.put(ovsdbNode.deviceId(), ovsdbNode); - if (ovsdbNode.state() != INIT) { - updateNode(ovsdbNode, INIT); + + // check ovsdb and integration bridge connection state first + if (isNodeConnected(ovsdb)) { + log.warn("Cannot delete connected node {}", ovsdb.host()); + } else { + nodeStore.remove(ovsdb.deviceId()); } } @Override - public void deleteNode(OvsdbNode ovsdbNode) { - if (!nodeStore.containsKey(ovsdbNode.deviceId())) { - log.warn("Node {} does not exist", ovsdbNode.host()); + public void connect(OvsdbNode ovsdb) { + checkNotNull(ovsdb); + + if (!nodeStore.containsKey(ovsdb.deviceId())) { + log.warn("Node {} does not exist", ovsdb.host()); return; } - updateNode(ovsdbNode, DISCONNECT); + controller.connect(ovsdb.ip(), ovsdb.port()); } @Override - public void updateNode(OvsdbNode ovsdbNode, State state) { - if (!nodeStore.containsKey(ovsdbNode.deviceId())) { - log.warn("Node {} does not exist", ovsdbNode.host()); + public void disconnect(OvsdbNode ovsdb) { + checkNotNull(ovsdb); + + if (!nodeStore.containsKey(ovsdb.deviceId())) { + log.warn("Node {} does not exist", ovsdb.host()); return; } - DefaultOvsdbNode updatedNode = new DefaultOvsdbNode(ovsdbNode.host(), - ovsdbNode.ip(), - ovsdbNode.port(), - state); - nodeStore.put(ovsdbNode.deviceId(), updatedNode); + + OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb); + checkNotNull(ovsdbClient); + + if (ovsdbClient.isConnected()) { + ovsdbClient.disconnect(); + } } @Override @@ -159,14 +202,42 @@ public class CordVtn implements CordVtnService { @Override public OvsdbNode getNode(DeviceId deviceId) { - return nodeStore.get(deviceId); + Versioned<OvsdbNode> ovsdb = nodeStore.get(deviceId); + if (ovsdb != null) { + return ovsdb.value(); + } else { + return null; + } } @Override public List<OvsdbNode> getNodes() { - return nodeStore.values() - .stream() - .collect(Collectors.toList()); + List<OvsdbNode> ovsdbs = new ArrayList<>(); + ovsdbs.addAll(Collections2.transform(nodeStore.values(), Versioned::value)); + return ovsdbs; + } + + @Override + public boolean isNodeConnected(OvsdbNode ovsdb) { + checkNotNull(ovsdb); + + OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb); + if (ovsdbClient == null) { + return false; + } else { + return ovsdbClient.isConnected(); + } + } + + private OvsdbClientService getOvsdbClient(OvsdbNode ovsdb) { + checkNotNull(ovsdb); + + OvsdbClientService ovsdbClient = controller.getOvsdbClient( + new OvsdbNodeId(ovsdb.ip(), ovsdb.port().toInt())); + if (ovsdbClient == null) { + log.warn("Couldn't find ovsdb client of node {}", ovsdb.host()); + } + return ovsdbClient; } private class InternalDeviceListener implements DeviceListener { @@ -182,6 +253,7 @@ public class CordVtn implements CordVtnService { break; case DEVICE_AVAILABILITY_CHANGED: eventExecutor.submit(() -> handler.disconnected(device)); + // TODO handle the case that the device is recovered break; default: break; @@ -212,14 +284,27 @@ public class CordVtn implements CordVtnService { @Override public void connected(Device device) { - // create bridge and set bridgeId - // set node state connected + log.info("Ovsdb {} is connected", device.id()); + + if (!mastershipService.isLocalMaster(device.id())) { + return; + } + + // TODO change to use bridge config + OvsdbNode ovsdb = getNode(device.id()); + OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb); + + List<ControllerInfo> controllers = new ArrayList<>(); + Sets.newHashSet(clusterService.getNodes()).forEach(controller -> + controllers.add(new ControllerInfo(controller.ip(), OFPORT, "tcp"))); + String dpid = ovsdb.intBrId().toString().substring(DPID_BEGIN); + + ovsdbClient.createBridge(DEFAULT_BRIDGE_NAME, dpid, controllers); } @Override public void disconnected(Device device) { - // set node state disconnected if the node exists - // which means that the node is not deleted explicitly + log.warn("Ovsdb {} is disconnected", device.id()); } } @@ -227,12 +312,29 @@ public class CordVtn implements CordVtnService { @Override public void connected(Device device) { - // create vxlan port + log.info("Integration Bridge {} is detected", device.id()); + + OvsdbNode ovsdb = getNodes().stream() + .filter(node -> node.intBrId().equals(device.id())) + .findFirst().get(); + + if (ovsdb == null) { + log.warn("Couldn't find OVSDB associated with {}", device.id()); + return; + } + + if (!mastershipService.isLocalMaster(ovsdb.deviceId())) { + return; + } + + // TODO change to use tunnel config and tunnel description + OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb); + ovsdbClient.createTunnel(DEFAULT_BRIDGE_NAME, "vxlan", "vxlan", VXLAN_OPTIONS); } @Override public void disconnected(Device device) { - + log.info("Integration Bridge {} is vanished", device.id()); } } @@ -240,12 +342,12 @@ public class CordVtn implements CordVtnService { @Override public void connected(Host host) { - // install flow rules for this vm + log.info("VM {} is detected", host.id()); } @Override public void disconnected(Host host) { - // uninstall flow rules associated with this vm + log.info("VM {} is vanished", host.id()); } } } |