From 13d05bc8458758ee39cb829098241e89616717ee Mon Sep 17 00:00:00 2001 From: Ashlee Young Date: Wed, 9 Sep 2015 22:15:21 -0700 Subject: ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60 Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd --- framework/src/onos/ovsdb/ctl/pom.xml | 65 ++++ .../controller/impl/ChannelConnectionListener.java | 43 +++ .../ovsdb/controller/impl/Controller.java | 201 ++++++++++ .../ovsdb/controller/impl/MessageDecoder.java | 55 +++ .../ovsdb/controller/impl/OvsdbControllerImpl.java | 413 +++++++++++++++++++++ .../ovsdb/controller/impl/OvsdbJsonRpcHandler.java | 131 +++++++ .../ovsdb/controller/impl/package-info.java | 20 + 7 files changed, 928 insertions(+) create mode 100644 framework/src/onos/ovsdb/ctl/pom.xml create mode 100644 framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/ChannelConnectionListener.java create mode 100644 framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java create mode 100644 framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/MessageDecoder.java create mode 100644 framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java create mode 100644 framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbJsonRpcHandler.java create mode 100644 framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/package-info.java (limited to 'framework/src/onos/ovsdb/ctl') diff --git a/framework/src/onos/ovsdb/ctl/pom.xml b/framework/src/onos/ovsdb/ctl/pom.xml new file mode 100644 index 00000000..60c1b439 --- /dev/null +++ b/framework/src/onos/ovsdb/ctl/pom.xml @@ -0,0 +1,65 @@ + + + + 4.0.0 + + org.onosproject + onos-ovsdb + 1.3.0-SNAPSHOT + ../pom.xml + + + onos-ovsdb-ctl + bundle + + + + junit + junit + test + + + org.apache.felix + org.apache.felix.scr.annotations + + + org.osgi + org.osgi.compendium + + + org.onosproject + onos-ovsdb-api + ${project.version} + + + org.onosproject + onos-ovsdb-rfc + ${project.version} + + + + + + + org.apache.felix + maven-scr-plugin + + + + diff --git a/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/ChannelConnectionListener.java b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/ChannelConnectionListener.java new file mode 100644 index 00000000..f17c25f7 --- /dev/null +++ b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/ChannelConnectionListener.java @@ -0,0 +1,43 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ovsdb.controller.impl; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; + +import org.onosproject.ovsdb.controller.driver.OvsdbProviderService; + +/** + * The listener class. Handles when the node disconnect. + */ +public class ChannelConnectionListener implements ChannelFutureListener { + + private final OvsdbProviderService providerService; + + /** + * Constructor from a OvsdbProviderService providerService. + * + * @param providerService the providerService to use + */ + public ChannelConnectionListener(OvsdbProviderService providerService) { + this.providerService = providerService; + } + + @Override + public void operationComplete(ChannelFuture arg0) { + providerService.nodeRemoved(); + } +} diff --git a/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java new file mode 100644 index 00000000..07582327 --- /dev/null +++ b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java @@ -0,0 +1,201 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ovsdb.controller.impl; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; + +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.onlab.packet.IpAddress; +import org.onosproject.ovsdb.controller.OvsdbConstant; +import org.onosproject.ovsdb.controller.OvsdbNodeId; +import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient; +import org.onosproject.ovsdb.controller.driver.OvsdbAgent; +import org.onosproject.ovsdb.controller.driver.OvsdbProviderService; +import org.onosproject.ovsdb.rfc.jsonrpc.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The main controller class. Handles all setup and network listeners - + * Distributed ovsdbClient. + */ +public class Controller { + protected static final Logger log = LoggerFactory + .getLogger(Controller.class); + + private int ovsdbPort = OvsdbConstant.OVSDBPORT; + + private OvsdbAgent agent; + private Callback monitorCallback; + + private final ExecutorService executorService = Executors + .newFixedThreadPool(10); + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Class serverChannelClass; + + /** + * Initialization. + */ + private void initEventLoopGroup() { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + serverChannelClass = NioServerSocketChannel.class; + } + + /** + * Accepts incoming connections. + */ + private void startAcceptingConnections() throws InterruptedException { + ServerBootstrap b = new ServerBootstrap(); + + b.group(bossGroup, workerGroup).channel(serverChannelClass) + .childHandler(new OnosCommunicationChannelInitializer()); + b.option(ChannelOption.SO_BACKLOG, 128); + b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); + b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024); + b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + b.childOption(ChannelOption.SO_KEEPALIVE, true); + b.bind(ovsdbPort).sync(); + } + + /** + * Tells controller that we're ready to accept ovsdb node loop. + * @throws InterruptedException if thread is interrupted + */ + public void run() throws InterruptedException { + initEventLoopGroup(); + startAcceptingConnections(); + } + + /** + * Adds channel pipiline to handle a new connected node. + */ + private class OnosCommunicationChannelInitializer + extends ChannelInitializer { + protected void initChannel(SocketChannel channel) throws Exception { + log.info("New channel created"); + channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); + channel.pipeline().addLast(new MessageDecoder()); + handleNewNodeConnection(channel); + + } + } + + /** + * Handles the new connection of a node. + * + * @param channel the channel to use. + */ + private void handleNewNodeConnection(final Channel channel) { + executorService.execute(new Runnable() { + @Override + public void run() { + log.info("Handle new node connection"); + + IpAddress ipAddress = IpAddress + .valueOf(((InetSocketAddress) channel.remoteAddress()) + .getAddress().getHostAddress()); + long port = ((InetSocketAddress) channel.remoteAddress()) + .getPort(); + + log.info("Get connection from ip address {} : {}", + ipAddress.toString(), port); + + OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port); + OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId, + agent, + monitorCallback, + channel); + ovsdbProviderService.setConnection(true); + OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler( + nodeId); + ovsdbJsonRpcHandler + .setOvsdbProviderService(ovsdbProviderService); + channel.pipeline().addLast(ovsdbJsonRpcHandler); + + ovsdbProviderService.nodeAdded(); + ChannelFuture closeFuture = channel.closeFuture(); + closeFuture + .addListener(new ChannelConnectionListener( + ovsdbProviderService)); + } + }); + } + + /** + * Gets an ovsdb client instance. + * + * @param nodeId data ovsdb node id + * @param agent OvsdbAgent + * @param monitorCallback Callback + * @param channel Channel + * @return OvsdbProviderService instance + */ + protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId, + OvsdbAgent agent, + Callback monitorCallback, + Channel channel) { + OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient( + nodeId); + ovsdbProviderService.setAgent(agent); + ovsdbProviderService.setCallback(monitorCallback); + ovsdbProviderService.setChannel(channel); + return ovsdbProviderService; + } + + /** + * Starts controller. + * + * @param agent OvsdbAgent + * @param monitorCallback Callback + */ + public void start(OvsdbAgent agent, Callback monitorCallback) { + this.agent = agent; + this.monitorCallback = monitorCallback; + try { + this.run(); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting to start"); + Thread.currentThread().interrupt(); + } + } + + /** + * Stops controller. + * + */ + public void stop() { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} diff --git a/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/MessageDecoder.java b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/MessageDecoder.java new file mode 100644 index 00000000..e0e22753 --- /dev/null +++ b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/MessageDecoder.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ovsdb.controller.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +import org.onosproject.ovsdb.rfc.jsonrpc.JsonReadContext; +import org.onosproject.ovsdb.rfc.utils.JsonRpcReaderUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Decoder for inbound messages. + */ +public class MessageDecoder extends ByteToMessageDecoder { + + private final Logger log = LoggerFactory.getLogger(MessageDecoder.class); + private final JsonReadContext context = new JsonReadContext(); + + /** + * Default constructor. + */ + public MessageDecoder() { + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buf, + List out) throws Exception { + log.debug("Message decoder"); + JsonRpcReaderUtil.readToJsonNode(buf, out, context); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { + log.error("Exception inside channel handling pipeline.", cause); + context.close(); + } +} diff --git a/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java new file mode 100644 index 00000000..9b482968 --- /dev/null +++ b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java @@ -0,0 +1,413 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ovsdb.controller.impl; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.math.BigInteger; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onlab.packet.IpAddress; +import org.onlab.packet.MacAddress; +import org.onosproject.ovsdb.controller.DefaultEventSubject; +import org.onosproject.ovsdb.controller.EventSubject; +import org.onosproject.ovsdb.controller.OvsdbClientService; +import org.onosproject.ovsdb.controller.OvsdbConstant; +import org.onosproject.ovsdb.controller.OvsdbController; +import org.onosproject.ovsdb.controller.OvsdbDatapathId; +import org.onosproject.ovsdb.controller.OvsdbEvent; +import org.onosproject.ovsdb.controller.OvsdbEvent.Type; +import org.onosproject.ovsdb.controller.OvsdbEventListener; +import org.onosproject.ovsdb.controller.OvsdbIfaceId; +import org.onosproject.ovsdb.controller.OvsdbNodeId; +import org.onosproject.ovsdb.controller.OvsdbNodeListener; +import org.onosproject.ovsdb.controller.OvsdbPortName; +import org.onosproject.ovsdb.controller.OvsdbPortNumber; +import org.onosproject.ovsdb.controller.OvsdbPortType; +import org.onosproject.ovsdb.controller.driver.OvsdbAgent; +import org.onosproject.ovsdb.rfc.jsonrpc.Callback; +import org.onosproject.ovsdb.rfc.message.TableUpdate; +import org.onosproject.ovsdb.rfc.message.TableUpdates; +import org.onosproject.ovsdb.rfc.message.UpdateNotification; +import org.onosproject.ovsdb.rfc.notation.OvsdbMap; +import org.onosproject.ovsdb.rfc.notation.OvsdbSet; +import org.onosproject.ovsdb.rfc.notation.Row; +import org.onosproject.ovsdb.rfc.notation.UUID; +import org.onosproject.ovsdb.rfc.schema.DatabaseSchema; +import org.onosproject.ovsdb.rfc.table.Bridge; +import org.onosproject.ovsdb.rfc.table.Interface; +import org.onosproject.ovsdb.rfc.table.OvsdbTable; +import org.onosproject.ovsdb.rfc.table.TableGenerator; +import org.onosproject.ovsdb.rfc.utils.FromJsonUtil; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * The implementation of OvsdbController. + */ +@Component(immediate = true) +@Service +public class OvsdbControllerImpl implements OvsdbController { + + public static final Logger log = LoggerFactory + .getLogger(OvsdbControllerImpl.class); + + protected ConcurrentHashMap ovsdbClients = + new ConcurrentHashMap(); + + protected OvsdbAgent agent = new InternalOvsdbNodeAgent(); + protected InternalMonitorCallBack updateCallback = new InternalMonitorCallBack(); + + protected Set ovsdbNodeListener = new CopyOnWriteArraySet<>(); + protected Set ovsdbEventListener = new CopyOnWriteArraySet<>(); + + protected ConcurrentHashMap requestNotification = + new ConcurrentHashMap(); + + protected ConcurrentHashMap requestDbName = new ConcurrentHashMap(); + + private final Controller controller = new Controller(); + + @Activate + public void activate(ComponentContext context) { + controller.start(agent, updateCallback); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + controller.stop(); + log.info("Stoped"); + } + + @Override + public void addNodeListener(OvsdbNodeListener listener) { + if (!ovsdbNodeListener.contains(listener)) { + this.ovsdbNodeListener.add(listener); + } + } + + @Override + public void removeNodeListener(OvsdbNodeListener listener) { + this.ovsdbNodeListener.remove(listener); + } + + @Override + public void addOvsdbEventListener(OvsdbEventListener listener) { + if (!ovsdbEventListener.contains(listener)) { + this.ovsdbEventListener.add(listener); + } + } + + @Override + public void removeOvsdbEventListener(OvsdbEventListener listener) { + this.ovsdbEventListener.remove(listener); + } + + @Override + public List getNodeIds() { + // TODO Auto-generated method stub + return null; + } + + @Override + public OvsdbClientService getOvsdbClient(OvsdbNodeId nodeId) { + return ovsdbClients.get(nodeId); + } + + /** + * Implementation of an Ovsdb Agent which is responsible for keeping track + * of connected node and the state in which they are. + */ + private class InternalOvsdbNodeAgent implements OvsdbAgent { + @Override + public void addConnectedNode(OvsdbNodeId nodeId, + OvsdbClientService ovsdbClient) { + + if (ovsdbClients.get(nodeId) != null) { + return; + } else { + ovsdbClients.put(nodeId, ovsdbClient); + + try { + List dbNames = ovsdbClient.listDbs().get(); + for (String dbName : dbNames) { + DatabaseSchema dbSchema; + dbSchema = ovsdbClient.getOvsdbSchema(dbName).get(); + + log.debug("Begin to monitor tables"); + String id = java.util.UUID.randomUUID().toString(); + TableUpdates updates = ovsdbClient + .monitorTables(dbName, id).get(); + + requestDbName.put(id, dbName); + requestNotification.put(id, ovsdbClient); + + if (updates != null) { + processTableUpdates(ovsdbClient, updates, + dbSchema.name()); + } + } + } catch (InterruptedException e) { + log.warn("Interrupted while waiting to get message from ovsdb"); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Exception thrown while to get message from ovsdb"); + } + + log.debug("Add node to north"); + for (OvsdbNodeListener l : ovsdbNodeListener) { + l.nodeAdded(nodeId); + } + return; + } + } + + @Override + public void removeConnectedNode(OvsdbNodeId nodeId) { + ovsdbClients.remove(nodeId); + log.debug("Node connection is removed"); + for (OvsdbNodeListener l : ovsdbNodeListener) { + l.nodeRemoved(nodeId); + } + } + } + + /** + * Processes table updates. + * + * @param clientService OvsdbClientService instance + * @param updates TableUpdates instance + * @param dbName ovsdb database name + */ + private void processTableUpdates(OvsdbClientService clientService, + TableUpdates updates, String dbName) + throws InterruptedException { + checkNotNull(clientService, "OvsdbClientService is not null"); + + DatabaseSchema dbSchema = clientService.getDatabaseSchema(dbName); + + for (String tableName : updates.result().keySet()) { + TableUpdate update = updates.result().get(tableName); + for (UUID uuid : (Set) update.rows().keySet()) { + log.debug("Begin to process table updates uuid: {}, databaseName: {}, tableName: {}", + uuid.value(), dbName, tableName); + + Row newRow = update.getNew(uuid); + if (newRow != null) { + clientService.updateOvsdbStore(dbName, tableName, + uuid.value(), newRow); + + if (OvsdbConstant.INTERFACE.equals(tableName)) { + dispatchInterfaceEvent(clientService, + newRow, + OvsdbEvent.Type.PORT_ADDED, + dbSchema); + } + } else if (update.getOld(uuid) != null) { + if (OvsdbConstant.INTERFACE.equals(tableName)) { + Row row = clientService.getRow(OvsdbConstant.DATABASENAME, tableName, uuid.value()); + dispatchInterfaceEvent(clientService, + row, + OvsdbEvent.Type.PORT_REMOVED, + dbSchema); + } + clientService.removeRow(dbName, tableName, uuid.value()); + } + } + } + } + + /** + * Dispatches event to the north. + * + * @param clientService OvsdbClientService instance + * @param newRow a new row + * @param oldRow an old row + * @param eventType type of event + * @param dbSchema ovsdb database schema + */ + private void dispatchInterfaceEvent(OvsdbClientService clientService, + Row row, + Type eventType, + DatabaseSchema dbSchema) { + + long dpid = getDataPathid(clientService, dbSchema); + Interface intf = (Interface) TableGenerator + .getTable(dbSchema, row, OvsdbTable.INTERFACE); + if (intf == null) { + return; + } + + String portType = (String) intf.getTypeColumn().data(); + long localPort = getOfPort(intf); + if (localPort < 0) { + return; + } + String[] macAndIfaceId = getMacAndIfaceid(intf); + if (macAndIfaceId == null) { + return; + } + + EventSubject eventSubject = new DefaultEventSubject(MacAddress.valueOf( + macAndIfaceId[0]), + new HashSet(), + new OvsdbPortName(intf + .getName()), + new OvsdbPortNumber(localPort), + new OvsdbDatapathId(Long + .toString(dpid)), + new OvsdbPortType(portType), + new OvsdbIfaceId(macAndIfaceId[1])); + for (OvsdbEventListener listener : ovsdbEventListener) { + listener.handle(new OvsdbEvent(eventType, + eventSubject)); + } + } + + /** + * Gets mac and iface from the table Interface. + * + * @param intf Interface instance + * @return attachedMac, ifaceid + */ + private String[] getMacAndIfaceid(Interface intf) { + OvsdbMap ovsdbMap = (OvsdbMap) intf.getExternalIdsColumn().data(); + @SuppressWarnings("unchecked") + Map externalIds = ovsdbMap.map(); + if (externalIds == null) { + log.warn("The external_ids is null"); + return null; + } + + String attachedMac = externalIds.get(OvsdbConstant.EXTERNAL_ID_VM_MAC); + if (attachedMac == null) { + log.warn("The attachedMac is null"); + return null; + } + String ifaceid = externalIds + .get(OvsdbConstant.EXTERNAL_ID_INTERFACE_ID); + if (ifaceid == null) { + log.warn("The ifaceid is null"); + return null; + } + return new String[] {attachedMac, ifaceid}; + } + + /** + * Gets ofPorts number from table Interface. + * + * @param intf Interface instance + * @return ofport the ofport number + */ + private long getOfPort(Interface intf) { + OvsdbSet ofPortSet = (OvsdbSet) intf.getOpenFlowPortColumn().data(); + @SuppressWarnings("unchecked") + Set ofPorts = ofPortSet.set(); + while (ofPorts == null || ofPorts.size() <= 0) { + log.debug("The ofport is null in {}", intf.getName()); + return -1; + } + Iterator it = ofPorts.iterator(); + return Long.parseLong(it.next().toString()); + } + + /** + * Gets datapathid from table bridge. + * + * @param clientService OvsdbClientService instance + * @param dbSchema ovsdb database schema + * @return datapathid the bridge datapathid + */ + private long getDataPathid(OvsdbClientService clientService, + DatabaseSchema dbSchema) { + String bridgeUuid = clientService + .getBridgeUuid(OvsdbConstant.INTEGRATION_BRIDGE); + if (bridgeUuid == null) { + log.debug("Unable to spot bridge uuid for {} in {}", + OvsdbConstant.INTEGRATION_BRIDGE, clientService); + return 0; + } + + Row bridgeRow = clientService.getRow(OvsdbConstant.DATABASENAME, + "Bridge", bridgeUuid); + Bridge bridge = (Bridge) TableGenerator.getTable(dbSchema, bridgeRow, + OvsdbTable.BRIDGE); + OvsdbSet dpidSet = (OvsdbSet) bridge.getDatapathIdColumn().data(); + @SuppressWarnings("unchecked") + Set dpids = dpidSet.set(); + if (dpids == null || dpids.size() == 0) { + return 0; + } + return stringToLong((String) dpids.toArray()[0]); + } + + private long stringToLong(String values) { + long value = (new BigInteger(values.replaceAll(":", ""), 16)) + .longValue(); + return value; + } + + /** + * Implementation of an Callback which is responsible for receiving request + * infomation from ovsdb. + */ + private class InternalMonitorCallBack implements Callback { + @Override + public void update(UpdateNotification updateNotification) { + Object key = updateNotification.jsonValue(); + OvsdbClientService ovsdbClient = requestNotification.get(key); + + String dbName = requestDbName.get(key); + JsonNode updatesJson = updateNotification.tbUpdatesJsonNode(); + DatabaseSchema dbSchema = ovsdbClient.getDatabaseSchema(dbName); + TableUpdates updates = FromJsonUtil + .jsonNodeToTableUpdates(updatesJson, dbSchema); + try { + processTableUpdates(ovsdbClient, updates, dbName); + } catch (InterruptedException e) { + log.warn("Interrupted while processing table updates"); + Thread.currentThread().interrupt(); + } + } + + @Override + public void locked(List ids) { + // TODO Auto-generated method stub + } + + @Override + public void stolen(List ids) { + // TODO Auto-generated method stub + } + + } + +} diff --git a/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbJsonRpcHandler.java b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbJsonRpcHandler.java new file mode 100644 index 00000000..37942c24 --- /dev/null +++ b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbJsonRpcHandler.java @@ -0,0 +1,131 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ovsdb.controller.impl; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import org.onosproject.ovsdb.controller.OvsdbNodeId; +import org.onosproject.ovsdb.controller.driver.OvsdbProviderService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Strings; + +/** + * Channel handler deals with the node connection and dispatches + * ovsdb messages to the appropriate locations. + */ +public final class OvsdbJsonRpcHandler extends ChannelInboundHandlerAdapter { + protected static final Logger log = LoggerFactory + .getLogger(OvsdbJsonRpcHandler.class); + private OvsdbNodeId ovsdbNodeId; + private OvsdbProviderService ovsdbProviderService; + + /** + * Constructor from a OvsdbNodeId ovsdbNodeId. + * + * @param ovsdbNodeId the ovsdbNodeId to use + */ + public OvsdbJsonRpcHandler(OvsdbNodeId ovsdbNodeId) { + super(); + this.ovsdbNodeId = ovsdbNodeId; + } + + /** + * Gets the ovsdbProviderService instance. + * + * @return the instance of the ovsdbProviderService + */ + public OvsdbProviderService getOvsdbProviderService() { + return ovsdbProviderService; + } + + /** + * Sets the ovsdbProviderService instance. + * + * @param ovsdbNodeDriver the ovsdbNodeDriver to use + */ + public void setOvsdbProviderService(OvsdbProviderService ovsdbNodeDriver) { + this.ovsdbProviderService = ovsdbNodeDriver; + } + + /** + * Gets the OvsdbNodeId instance. + * + * @return the instance of the OvsdbNodeId + */ + public OvsdbNodeId getNodeId() { + return ovsdbNodeId; + } + + /** + * Sets the ovsdb node id. + * + * @param ovsdbNodeId the ovsdbNodeId to use + */ + public void setNodeId(OvsdbNodeId ovsdbNodeId) { + this.ovsdbNodeId = ovsdbNodeId; + } + + /** + * Processes an JsonNode message received on the channel. + * + * @param jsonNode The OvsdbJsonRpcHandler that received the message + */ + private void processOvsdbMessage(JsonNode jsonNode) { + + log.info("Handle ovsdb message"); + + if (jsonNode.has("result")) { + + log.debug("Handle ovsdb result"); + ovsdbProviderService.processResult(jsonNode); + + } else if (jsonNode.hasNonNull("method")) { + + log.debug("Handle ovsdb request"); + if (jsonNode.has("id") + && !Strings.isNullOrEmpty(jsonNode.get("id").asText())) { + ovsdbProviderService.processRequest(jsonNode); + } + + } + return; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + log.debug("Receive message from ovsdb"); + if (msg instanceof JsonNode) { + JsonNode jsonNode = (JsonNode) msg; + processOvsdbMessage(jsonNode); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { + log.error("Exception inside channel handling pipeline.", cause); + context.close(); + } +} diff --git a/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/package-info.java b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/package-info.java new file mode 100644 index 00000000..379e947a --- /dev/null +++ b/framework/src/onos/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the OVSDB controller IO subsystem. + */ +package org.onosproject.ovsdb.controller.impl; \ No newline at end of file -- cgit 1.2.3-korg