aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep')
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/Controller.java188
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepChannelHandler.java652
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepClientControllerImpl.java222
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepClientImpl.java220
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepMessageDecoder.java68
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepMessageEncoder.java58
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepPacketStatsImpl.java105
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepPipelineFactory.java66
-rw-r--r--framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/package-info.java20
9 files changed, 1599 insertions, 0 deletions
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/Controller.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/Controller.java
new file mode 100644
index 00000000..9c27810c
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/Controller.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.pcep.controller.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.onosproject.pcep.controller.PccId;
+import org.onosproject.pcep.controller.PcepPacketStats;
+import org.onosproject.pcep.controller.driver.PcepAgent;
+import org.onosproject.pcep.controller.driver.PcepClientDriver;
+import org.onosproject.pcepio.protocol.PcepFactories;
+import org.onosproject.pcepio.protocol.PcepFactory;
+import org.onosproject.pcepio.protocol.PcepVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The main controller class. Handles all setup and network listeners -
+ * Distributed ownership control of pcc through IControllerRegistryService
+ */
+public class Controller {
+
+ private static final Logger log = LoggerFactory.getLogger(Controller.class);
+
+ private static final PcepFactory FACTORY1 = PcepFactories.getFactory(PcepVersion.PCEP_1);
+
+ private ChannelGroup cg;
+
+ // Configuration options
+ private int pcepPort = 4189;
+ private int workerThreads = 10;
+
+ // Start time of the controller
+ private long systemStartTime;
+
+ private PcepAgent agent;
+
+ private NioServerSocketChannelFactory execFactory;
+
+ // Perf. related configuration
+ private static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
+
+ /**
+ * Returns factory version for processing pcep messages.
+ *
+ * @return instance of factory version
+ */
+ public PcepFactory getPcepMessageFactory1() {
+ return FACTORY1;
+ }
+
+ /**
+ * To get system start time.
+ *
+ * @return system start time in milliseconds
+ */
+ public long getSystemStartTime() {
+ return (this.systemStartTime);
+ }
+
+ /**
+ * Tell controller that we're ready to accept pcc connections.
+ */
+ public void run() {
+ try {
+ final ServerBootstrap bootstrap = createServerBootStrap();
+
+ bootstrap.setOption("reuseAddr", true);
+ bootstrap.setOption("child.keepAlive", true);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
+
+ ChannelPipelineFactory pfact = new PcepPipelineFactory(this);
+
+ bootstrap.setPipelineFactory(pfact);
+ InetSocketAddress sa = new InetSocketAddress(pcepPort);
+ cg = new DefaultChannelGroup();
+ cg.add(bootstrap.bind(sa));
+ log.info("Listening for PCC connection on {}", sa);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Creates server boot strap.
+ *
+ * @return ServerBootStrap
+ */
+ private ServerBootstrap createServerBootStrap() {
+ if (workerThreads == 0) {
+ execFactory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(groupedThreads("onos/pcep", "boss-%d")),
+ Executors.newCachedThreadPool(groupedThreads("onos/pcep", "worker-%d")));
+ return new ServerBootstrap(execFactory);
+ } else {
+ execFactory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(groupedThreads("onos/pcep", "boss-%d")),
+ Executors.newCachedThreadPool(groupedThreads("onos/pcep", "worker-%d")), workerThreads);
+ return new ServerBootstrap(execFactory);
+ }
+ }
+
+ /**
+ * Initialize internal data structures.
+ */
+ public void init() {
+ // These data structures are initialized here because other
+ // module's startUp() might be called before ours
+ this.systemStartTime = System.currentTimeMillis();
+ }
+
+ public Map<String, Long> getMemory() {
+ Map<String, Long> m = new HashMap<>();
+ Runtime runtime = Runtime.getRuntime();
+ m.put("total", runtime.totalMemory());
+ m.put("free", runtime.freeMemory());
+ return m;
+ }
+
+ public Long getUptime() {
+ RuntimeMXBean rb = ManagementFactory.getRuntimeMXBean();
+ return rb.getUptime();
+ }
+
+ /**
+ * Creates instance of Pcep client.
+ *
+ * @param pccId pcc identifier
+ * @param sessionID session id
+ * @param pv pcep version
+ * @param pktStats pcep packet statistics
+ * @return instance of PcepClient
+ */
+ protected PcepClientDriver getPcepClientInstance(PccId pccId, int sessionID, PcepVersion pv,
+ PcepPacketStats pktStats) {
+ PcepClientDriver pcepClientDriver = new PcepClientImpl();
+ pcepClientDriver.init(pccId, pv, pktStats);
+ pcepClientDriver.setAgent(agent);
+ return pcepClientDriver;
+ }
+
+ /**
+ * Starts the pcep controller.
+ *
+ * @param ag Pcep agent
+ */
+ public void start(PcepAgent ag) {
+ log.info("Started");
+ this.agent = ag;
+ this.init();
+ this.run();
+ }
+
+ /**
+ * Stops the pcep controller.
+ */
+ public void stop() {
+ log.info("Stopped");
+ execFactory.shutdown();
+ cg.close();
+ }
+}
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepChannelHandler.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepChannelHandler.java
new file mode 100644
index 00000000..bc8721d7
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepChannelHandler.java
@@ -0,0 +1,652 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pcep.controller.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.timeout.IdleState;
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateEvent;
+import org.jboss.netty.handler.timeout.IdleStateHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.onlab.packet.IpAddress;
+import org.onosproject.pcep.controller.PccId;
+import org.onosproject.pcep.controller.driver.PcepClientDriver;
+import org.onosproject.pcepio.exceptions.PcepParseException;
+import org.onosproject.pcepio.protocol.PcepError;
+import org.onosproject.pcepio.protocol.PcepErrorInfo;
+import org.onosproject.pcepio.protocol.PcepErrorMsg;
+import org.onosproject.pcepio.protocol.PcepErrorObject;
+import org.onosproject.pcepio.protocol.PcepFactory;
+import org.onosproject.pcepio.protocol.PcepMessage;
+import org.onosproject.pcepio.protocol.PcepOpenMsg;
+import org.onosproject.pcepio.protocol.PcepOpenObject;
+import org.onosproject.pcepio.protocol.PcepType;
+import org.onosproject.pcepio.protocol.PcepVersion;
+import org.onosproject.pcepio.types.ErrorObjListWithOpen;
+import org.onosproject.pcepio.types.PceccCapabilityTlv;
+import org.onosproject.pcepio.types.StatefulPceCapabilityTlv;
+import org.onosproject.pcepio.types.PcepErrorDetailInfo;
+import org.onosproject.pcepio.types.PcepValueType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Channel handler deals with the pcc client connection and dispatches
+ * messages from client to the appropriate locations.
+ */
+class PcepChannelHandler extends IdleStateAwareChannelHandler {
+ static final byte DEADTIMER_MAXIMUM_VALUE = (byte) 0xFF;
+ static final byte KEEPALIVE_MULTIPLE_FOR_DEADTIMER = 4;
+ private static final Logger log = LoggerFactory.getLogger(PcepChannelHandler.class);
+ private final Controller controller;
+ private PcepClientDriver pc;
+ private PccId thispccId;
+ private Channel channel;
+ private byte sessionId = 0;
+ private byte keepAliveTime;
+ private byte deadTime;
+ private PcepPacketStatsImpl pcepPacketStats;
+ static final int MAX_WRONG_COUNT_PACKET = 5;
+ static final int BYTE_MASK = 0xFF;
+
+ // State needs to be volatile because the HandshakeTimeoutHandler
+ // needs to check if the handshake is complete
+ private volatile ChannelState state;
+
+ // When a pcc client with a ip addresss is found (i.e we already have a
+ // connected client with the same ip), the new client is immediately
+ // disconnected. At that point netty callsback channelDisconnected() which
+ // proceeds to cleaup client state - we need to ensure that it does not cleanup
+ // client state for the older (still connected) client
+ private volatile Boolean duplicatePccIdFound;
+
+ //Indicates the pcep version used by this pcc client
+ protected PcepVersion pcepVersion;
+ protected PcepFactory factory1;
+
+ /**
+ * Create a new unconnected PcepChannelHandler.
+ * @param controller parent controller
+ */
+ PcepChannelHandler(Controller controller) {
+ this.controller = controller;
+ this.state = ChannelState.INIT;
+ factory1 = controller.getPcepMessageFactory1();
+ duplicatePccIdFound = Boolean.FALSE;
+ pcepPacketStats = new PcepPacketStatsImpl();
+ }
+
+ /**
+ * To disconnect a PCC.
+ */
+ public void disconnectClient() {
+ pc.disconnectClient();
+ }
+
+ //*************************
+ // Channel State Machine
+ //*************************
+
+ /**
+ * The state machine for handling the client/channel state. All state
+ * transitions should happen from within the state machine (and not from other
+ * parts of the code)
+ */
+ enum ChannelState {
+ /**
+ * Initial state before channel is connected.
+ */
+ INIT(false) {
+
+ },
+ /**
+ * Once the session is established, wait for open message.
+ */
+ OPENWAIT(false) {
+ @Override
+ void processPcepMessage(PcepChannelHandler h, PcepMessage m) throws IOException, PcepParseException {
+
+ log.debug("Message received in OPEN WAIT State");
+
+ //check for open message
+ if (m.getType() != PcepType.OPEN) {
+ // When the message type is not open message increment the wrong packet statistics
+ h.processUnknownMsg();
+ log.debug("message is not OPEN message");
+ } else {
+
+ h.pcepPacketStats.addInPacket();
+ PcepOpenMsg pOpenmsg = (PcepOpenMsg) m;
+ // do Capability validation.
+ if (h.capabilityValidation(pOpenmsg)) {
+ log.debug("Sending handshake OPEN message");
+ h.sessionId = pOpenmsg.getPcepOpenObject().getSessionId();
+ h.pcepVersion = pOpenmsg.getPcepOpenObject().getVersion();
+
+ //setting keepalive and deadTimer
+ byte yKeepalive = pOpenmsg.getPcepOpenObject().getKeepAliveTime();
+ byte yDeadTimer = pOpenmsg.getPcepOpenObject().getDeadTime();
+ h.keepAliveTime = yKeepalive;
+ if (yKeepalive < yDeadTimer) {
+ h.deadTime = yDeadTimer;
+ } else {
+ if (DEADTIMER_MAXIMUM_VALUE > (yKeepalive * KEEPALIVE_MULTIPLE_FOR_DEADTIMER)) {
+ h.deadTime = (byte) (yKeepalive * KEEPALIVE_MULTIPLE_FOR_DEADTIMER);
+ } else {
+ h.deadTime = DEADTIMER_MAXIMUM_VALUE;
+ }
+ }
+ h.sendHandshakeOpenMessage();
+ h.pcepPacketStats.addOutPacket();
+ h.setState(KEEPWAIT);
+ } else {
+ log.debug("Capability validation failed. Sending PCEP-ERROR message to PCC.");
+ // Send PCEP-ERROR message.
+ PcepErrorMsg errMsg = h.getErrorMsg(PcepErrorDetailInfo.ERROR_TYPE_2,
+ PcepErrorDetailInfo.ERROR_VALUE_2);
+ h.channel.write(Collections.singletonList(errMsg));
+ }
+ }
+ }
+ },
+ /**
+ * Once the open messages are exchanged, wait for keep alive message.
+ */
+ KEEPWAIT(false) {
+ @Override
+ void processPcepMessage(PcepChannelHandler h, PcepMessage m) throws IOException, PcepParseException {
+ log.debug("message received in KEEPWAIT state");
+ //check for keep alive message
+ if (m.getType() != PcepType.KEEP_ALIVE) {
+ // When the message type is not keep alive message increment the wrong packet statistics
+ h.processUnknownMsg();
+ log.debug("message is not KEEPALIVE message");
+ } else {
+ // Set the client connected status
+ h.pcepPacketStats.addInPacket();
+ final SocketAddress address = h.channel.getRemoteAddress();
+ if (!(address instanceof InetSocketAddress)) {
+ throw new IOException("Invalid client connection. Pcc is indentifed based on IP");
+ }
+ log.debug("sending keep alive message in KEEPWAIT state");
+
+ final InetSocketAddress inetAddress = (InetSocketAddress) address;
+ h.thispccId = PccId.pccId(IpAddress.valueOf(inetAddress.getAddress()));
+ h.pc = h.controller.getPcepClientInstance(h.thispccId, h.sessionId, h.pcepVersion,
+ h.pcepPacketStats);
+ // set the status of pcc as connected
+ h.pc.setConnected(true);
+ h.pc.setChannel(h.channel);
+
+ // set any other specific parameters to the pcc
+ h.pc.setPcVersion(h.pcepVersion);
+ h.pc.setPcSessionId(h.sessionId);
+ h.pc.setPcKeepAliveTime(h.keepAliveTime);
+ h.pc.setPcDeadTime(h.deadTime);
+ int keepAliveTimer = h.keepAliveTime & BYTE_MASK;
+ int deadTimer = h.deadTime & BYTE_MASK;
+ if (0 == h.keepAliveTime) {
+ h.deadTime = 0;
+ }
+ // handle keep alive and dead time
+ if (keepAliveTimer != PcepPipelineFactory.DEFAULT_KEEP_ALIVE_TIME
+ || deadTimer != PcepPipelineFactory.DEFAULT_DEAD_TIME) {
+
+ h.channel.getPipeline().replace("idle", "idle",
+ new IdleStateHandler(PcepPipelineFactory.TIMER, deadTimer, keepAliveTimer, 0));
+ }
+ log.debug("Dead timer : " + deadTimer);
+ log.debug("Keep alive time : " + keepAliveTimer);
+
+ //set the state handshake completion.
+ h.sendKeepAliveMessage();
+ h.pcepPacketStats.addOutPacket();
+ h.setHandshakeComplete(true);
+
+ if (!h.pc.connectClient()) {
+ disconnectDuplicate(h);
+ } else {
+ h.setState(ESTABLISHED);
+ }
+ }
+ }
+ },
+ /**
+ * Once the keep alive messages are exchanged, the state is established.
+ */
+ ESTABLISHED(true) {
+ @Override
+ void processPcepMessage(PcepChannelHandler h, PcepMessage m) throws IOException, PcepParseException {
+
+ //h.channel.getPipeline().remove("waittimeout");
+ log.debug("Message received in established state " + m.getType());
+ //dispatch the message
+ h.dispatchMessage(m);
+ }
+ };
+ private boolean handshakeComplete;
+
+ ChannelState(boolean handshakeComplete) {
+ this.handshakeComplete = handshakeComplete;
+ }
+
+ void processPcepMessage(PcepChannelHandler h, PcepMessage m) throws IOException, PcepParseException {
+ // do nothing
+ }
+
+ /**
+ * Is this a state in which the handshake has completed.
+ *
+ * @return true if the handshake is complete
+ */
+ public boolean isHandshakeComplete() {
+ return this.handshakeComplete;
+ }
+
+ protected void disconnectDuplicate(PcepChannelHandler h) {
+ log.error("Duplicated Pcc IP or incompleted cleanup - " + "disconnecting channel {}",
+ h.getClientInfoString());
+ h.duplicatePccIdFound = Boolean.TRUE;
+ h.channel.disconnect();
+ }
+
+ /**
+ * Sets handshake complete status.
+ *
+ * @param handshakeComplete status of handshake
+ */
+ public void setHandshakeComplete(boolean handshakeComplete) {
+ this.handshakeComplete = handshakeComplete;
+ }
+
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ channel = e.getChannel();
+ log.info("PCC connected from {}", channel.getRemoteAddress());
+
+ // Wait for open message from pcc client
+ setState(ChannelState.OPENWAIT);
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ log.info("Pcc disconnected callback for pc:{}. Cleaning up ...", getClientInfoString());
+ if (thispccId != null) {
+ if (!duplicatePccIdFound) {
+ // if the disconnected client (on this ChannelHandler)
+ // was not one with a duplicate-dpid, it is safe to remove all
+ // state for it at the controller. Notice that if the disconnected
+ // client was a duplicate-ip, calling the method below would clear
+ // all state for the original client (with the same ip),
+ // which we obviously don't want.
+ log.debug("{}:removal called", getClientInfoString());
+ if (pc != null) {
+ pc.removeConnectedClient();
+ }
+ } else {
+ // A duplicate was disconnected on this ChannelHandler,
+ // this is the same client reconnecting, but the original state was
+ // not cleaned up - XXX check liveness of original ChannelHandler
+ log.debug("{}:duplicate found", getClientInfoString());
+ duplicatePccIdFound = Boolean.FALSE;
+ }
+ } else {
+ log.warn("no pccip in channelHandler registered for " + "disconnected client {}", getClientInfoString());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ PcepErrorMsg errMsg;
+ log.info("exceptionCaught: " + e.toString());
+
+ if (e.getCause() instanceof ReadTimeoutException) {
+ if (ChannelState.OPENWAIT == state) {
+ // When ReadTimeout timer is expired in OPENWAIT state, it is considered
+ // OpenWait timer.
+ errMsg = getErrorMsg(PcepErrorDetailInfo.ERROR_TYPE_1, PcepErrorDetailInfo.ERROR_VALUE_2);
+ log.debug("Sending PCEP-ERROR message to PCC.");
+ channel.write(Collections.singletonList(errMsg));
+ channel.close();
+ state = ChannelState.INIT;
+ return;
+ } else if (ChannelState.KEEPWAIT == state) {
+ // When ReadTimeout timer is expired in KEEPWAIT state, is is considered
+ // KeepWait timer.
+ errMsg = getErrorMsg(PcepErrorDetailInfo.ERROR_TYPE_1, PcepErrorDetailInfo.ERROR_VALUE_7);
+ log.debug("Sending PCEP-ERROR message to PCC.");
+ channel.write(Collections.singletonList(errMsg));
+ channel.close();
+ state = ChannelState.INIT;
+ return;
+ }
+ } else if (e.getCause() instanceof ClosedChannelException) {
+ log.debug("Channel for pc {} already closed", getClientInfoString());
+ } else if (e.getCause() instanceof IOException) {
+ log.error("Disconnecting client {} due to IO Error: {}", getClientInfoString(), e.getCause().getMessage());
+ if (log.isDebugEnabled()) {
+ // still print stack trace if debug is enabled
+ log.debug("StackTrace for previous Exception: ", e.getCause());
+ }
+ channel.close();
+ } else if (e.getCause() instanceof PcepParseException) {
+ PcepParseException errMsgParse = (PcepParseException) e.getCause();
+ byte errorType = errMsgParse.getErrorType();
+ byte errorValue = errMsgParse.getErrorValue();
+
+ if ((errorType == (byte) 0x0) && (errorValue == (byte) 0x0)) {
+ processUnknownMsg();
+ } else {
+ errMsg = getErrorMsg(errorType, errorValue);
+ log.debug("Sending PCEP-ERROR message to PCC.");
+ channel.write(Collections.singletonList(errMsg));
+ }
+ } else if (e.getCause() instanceof RejectedExecutionException) {
+ log.warn("Could not process message: queue full");
+ } else {
+ log.error("Error while processing message from client " + getClientInfoString() + "state " + this.state);
+ channel.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClientInfoString();
+ }
+
+ @Override
+ public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {
+ if (!isHandshakeComplete()) {
+ return;
+ }
+
+ if (e.getState() == IdleState.READER_IDLE) {
+ // When no message is received on channel for read timeout, then close
+ // the channel
+ log.info("Disconnecting client {} due to read timeout", getClientInfoString());
+ ctx.getChannel().close();
+ } else if (e.getState() == IdleState.WRITER_IDLE) {
+ // Send keep alive message
+ log.debug("Sending keep alive message due to IdleState timeout " + pc.toString());
+ pc.sendMessage(Collections.singletonList(pc.factory().buildKeepaliveMsg().build()));
+ }
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ if (e.getMessage() instanceof List) {
+ @SuppressWarnings("unchecked")
+ List<PcepMessage> msglist = (List<PcepMessage>) e.getMessage();
+ for (PcepMessage pm : msglist) {
+ // Do the actual packet processing
+ state.processPcepMessage(this, pm);
+ }
+ } else {
+ state.processPcepMessage(this, (PcepMessage) e.getMessage());
+ }
+ }
+
+ /**
+ * To set the handshake status.
+ *
+ * @param handshakeComplete value is handshake status
+ */
+ public void setHandshakeComplete(boolean handshakeComplete) {
+ this.state.setHandshakeComplete(handshakeComplete);
+ }
+
+ /**
+ * Is this a state in which the handshake has completed.
+ *
+ * @return true if the handshake is complete
+ */
+ public boolean isHandshakeComplete() {
+ return this.state.isHandshakeComplete();
+ }
+
+ /**
+ * To handle the pcep message.
+ *
+ * @param m pcep message
+ */
+ private void dispatchMessage(PcepMessage m) {
+ pc.handleMessage(m);
+ }
+
+ /**
+ * Return a string describing this client based on the already available
+ * information (ip address and/or remote socket).
+ *
+ * @return display string
+ */
+ private String getClientInfoString() {
+ if (pc != null) {
+ return pc.toString();
+ }
+ String channelString;
+ if (channel == null || channel.getRemoteAddress() == null) {
+ channelString = "?";
+ } else {
+ channelString = channel.getRemoteAddress().toString();
+ }
+ String pccIpString;
+ // TODO : implement functionality to get pcc id string
+ pccIpString = "?";
+ return String.format("[%s PCCIP[%s]]", channelString, pccIpString);
+ }
+
+ /**
+ * Update the channels state. Only called from the state machine.
+ *
+ * @param state
+ */
+ private void setState(ChannelState state) {
+ this.state = state;
+ }
+
+ /**
+ * Send handshake open message.
+ *
+ * @throws IOException,PcepParseException
+ */
+ private void sendHandshakeOpenMessage() throws IOException, PcepParseException {
+ PcepOpenObject pcepOpenobj = factory1.buildOpenObject()
+ .setSessionId(sessionId)
+ .setKeepAliveTime(keepAliveTime)
+ .setDeadTime(deadTime)
+ .build();
+ PcepMessage msg = factory1.buildOpenMsg()
+ .setPcepOpenObj(pcepOpenobj)
+ .build();
+ log.debug("Sending OPEN message to {}", channel.getRemoteAddress());
+ channel.write(Collections.singletonList(msg));
+ }
+
+ /**
+ * Capability Validation.
+ *
+ * @param pOpenmsg pcep open message
+ * @return success or failure
+ */
+ private boolean capabilityValidation(PcepOpenMsg pOpenmsg) {
+ LinkedList<PcepValueType> tlvList = pOpenmsg.getPcepOpenObject().getOptionalTlv();
+ boolean bFoundPceccCapability = false;
+ boolean bFoundStatefulPceCapability = false;
+ boolean bFoundPcInstantiationCapability = false;
+
+ ListIterator<PcepValueType> listIterator = tlvList.listIterator();
+ while (listIterator.hasNext()) {
+ PcepValueType tlv = listIterator.next();
+
+ switch (tlv.getType()) {
+ case PceccCapabilityTlv.TYPE:
+ bFoundPceccCapability = true;
+ break;
+ case StatefulPceCapabilityTlv.TYPE:
+ bFoundStatefulPceCapability = true;
+ StatefulPceCapabilityTlv stetefulPcCapTlv = (StatefulPceCapabilityTlv) tlv;
+ if (stetefulPcCapTlv.getIFlag()) {
+ bFoundPcInstantiationCapability = true;
+ }
+ break;
+ default:
+ continue;
+ }
+ }
+
+ return (bFoundPceccCapability && bFoundStatefulPceCapability && bFoundPcInstantiationCapability);
+ }
+
+ /**
+ * Send keep alive message.
+ *
+ * @throws IOException when channel is disconnected
+ * @throws PcepParseException while building keep alive message
+ */
+ private void sendKeepAliveMessage() throws IOException, PcepParseException {
+ PcepMessage msg = factory1.buildKeepaliveMsg().build();
+ log.debug("Sending KEEPALIVE message to {}", channel.getRemoteAddress());
+ channel.write(Collections.singletonList(msg));
+ }
+
+ /**
+ * Send error message and close channel with pcc.
+ */
+ private void sendErrMsgAndCloseChannel() {
+ // TODO send error message
+ channel.close();
+ }
+
+ /**
+ * Send error message when an invalid message is received.
+ *
+ * @throws PcepParseException while building error message
+ */
+ private void sendErrMsgForInvalidMsg() throws PcepParseException {
+ byte errorType = 0x02;
+ byte errorValue = 0x00;
+ PcepErrorMsg errMsg = getErrorMsg(errorType, errorValue);
+ channel.write(Collections.singletonList(errMsg));
+ }
+
+ /**
+ * Builds pcep error message based on error value and error type.
+ *
+ * @param errorType pcep error type
+ * @param errorValue pcep error value
+ * @return pcep error message
+ * @throws PcepParseException while bulding error message
+ */
+ public PcepErrorMsg getErrorMsg(byte errorType, byte errorValue) throws PcepParseException {
+ LinkedList<PcepErrorObject> llerrObj = new LinkedList<>();
+ PcepErrorMsg errMsg;
+
+ PcepErrorObject errObj = factory1.buildPcepErrorObject()
+ .setErrorValue(errorValue)
+ .setErrorType(errorType)
+ .build();
+
+ llerrObj.add(errObj);
+
+ if (state == ChannelState.OPENWAIT) {
+ //If Error caught in Openmessage
+ PcepOpenObject openObj = null;
+ ErrorObjListWithOpen errorObjListWithOpen = null;
+
+ if (0 != sessionId) {
+ openObj = factory1.buildOpenObject().setSessionId(sessionId).build();
+ errorObjListWithOpen = new ErrorObjListWithOpen(llerrObj, openObj);
+ } else {
+ errorObjListWithOpen = new ErrorObjListWithOpen(llerrObj, null);
+ }
+
+ errMsg = factory1.buildPcepErrorMsg()
+ .setErrorObjListWithOpen(errorObjListWithOpen)
+ .build();
+ } else {
+
+ //If Error caught in other than Openmessage
+ LinkedList<PcepError> llPcepErr = new LinkedList<>();
+
+ PcepError pcepErr = factory1.buildPcepError()
+ .setErrorObjList(llerrObj)
+ .build();
+
+ llPcepErr.add(pcepErr);
+
+ PcepErrorInfo errInfo = factory1.buildPcepErrorInfo()
+ .setPcepErrorList(llPcepErr)
+ .build();
+
+ errMsg = factory1.buildPcepErrorMsg()
+ .setPcepErrorInfo(errInfo)
+ .build();
+ }
+ return errMsg;
+ }
+
+ /**
+ * Process unknown pcep message received.
+ *
+ * @throws PcepParseException while building pcep error message
+ */
+ public void processUnknownMsg() throws PcepParseException {
+ Date now = null;
+ if (pcepPacketStats.wrongPacketCount() == 0) {
+ now = new Date();
+ pcepPacketStats.setTime(now.getTime());
+ pcepPacketStats.addWrongPacket();
+ sendErrMsgForInvalidMsg();
+ }
+
+ if (pcepPacketStats.wrongPacketCount() > 1) {
+ Date lastest = new Date();
+ pcepPacketStats.addWrongPacket();
+ //converting to seconds
+ if (((lastest.getTime() - pcepPacketStats.getTime()) / 1000) > 60) {
+ now = lastest;
+ pcepPacketStats.setTime(now.getTime());
+ pcepPacketStats.resetWrongPacket();
+ pcepPacketStats.addWrongPacket();
+ } else if (((int) (lastest.getTime() - now.getTime()) / 1000) < 60) {
+ if (MAX_WRONG_COUNT_PACKET <= pcepPacketStats.wrongPacketCount()) {
+ //reset once wrong packet count reaches MAX_WRONG_COUNT_PACKET
+ pcepPacketStats.resetWrongPacket();
+ // max wrong packets received send error message and close the session
+ sendErrMsgAndCloseChannel();
+ }
+ }
+ }
+ }
+}
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepClientControllerImpl.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepClientControllerImpl.java
new file mode 100644
index 00000000..00c8c694
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepClientControllerImpl.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.pcep.controller.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.pcep.controller.PccId;
+import org.onosproject.pcep.controller.PcepClient;
+import org.onosproject.pcep.controller.PcepClientController;
+import org.onosproject.pcep.controller.PcepClientListener;
+import org.onosproject.pcep.controller.PcepEventListener;
+import org.onosproject.pcep.controller.driver.PcepAgent;
+import org.onosproject.pcepio.protocol.PcepMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of PCEP client controller.
+ */
+@Component(immediate = true)
+@Service
+public class PcepClientControllerImpl implements PcepClientController {
+
+ private static final Logger log = LoggerFactory.getLogger(PcepClientControllerImpl.class);
+
+ protected ConcurrentHashMap<PccId, PcepClient> connectedClients =
+ new ConcurrentHashMap<>();
+
+ protected PcepClientAgent agent = new PcepClientAgent();
+ protected Set<PcepClientListener> pcepClientListener = new HashSet<>();
+
+ protected Set<PcepEventListener> pcepEventListener = Sets.newHashSet();
+
+ private final Controller ctrl = new Controller();
+
+ @Activate
+ public void activate() {
+ ctrl.start(agent);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ // Close all connected clients
+ closeConnectedClients();
+ ctrl.stop();
+ log.info("Stopped");
+ }
+
+ @Override
+ public Collection<PcepClient> getClients() {
+ return connectedClients.values();
+ }
+
+ @Override
+ public PcepClient getClient(PccId pccId) {
+ return connectedClients.get(pccId);
+ }
+
+ @Override
+ public void addListener(PcepClientListener listener) {
+ if (!pcepClientListener.contains(listener)) {
+ this.pcepClientListener.add(listener);
+ }
+ }
+
+ @Override
+ public void removeListener(PcepClientListener listener) {
+ this.pcepClientListener.remove(listener);
+ }
+
+ @Override
+ public void addEventListener(PcepEventListener listener) {
+ pcepEventListener.add(listener);
+ }
+
+ @Override
+ public void removeEventListener(PcepEventListener listener) {
+ pcepEventListener.remove(listener);
+ }
+
+ @Override
+ public void writeMessage(PccId pccId, PcepMessage msg) {
+ this.getClient(pccId).sendMessage(msg);
+ }
+
+ @Override
+ public void processClientMessage(PccId pccId, PcepMessage msg) {
+ PcepClient pc = getClient(pccId);
+
+ switch (msg.getType()) {
+ case NONE:
+ break;
+ case OPEN:
+ break;
+ case KEEP_ALIVE:
+ break;
+ case PATH_COMPUTATION_REQUEST:
+ break;
+ case PATH_COMPUTATION_REPLY:
+ break;
+ case NOTIFICATION:
+ break;
+ case ERROR:
+ break;
+ case CLOSE:
+ log.info("Sending Close Message to {" + pccId.toString() + "}");
+ pc.sendMessage(Collections.singletonList(pc.factory().buildCloseMsg().build()));
+ //now disconnect client
+ pc.disconnectClient();
+ break;
+ case REPORT:
+ for (PcepEventListener l : pcepEventListener) {
+ l.handleMessage(pccId, msg);
+ }
+ break;
+ case UPDATE:
+ for (PcepEventListener l : pcepEventListener) {
+ l.handleMessage(pccId, msg);
+ }
+ break;
+ case INITIATE:
+ for (PcepEventListener l : pcepEventListener) {
+ l.handleMessage(pccId, msg);
+ }
+ break;
+ case LABEL_UPDATE:
+ break;
+ case MAX:
+ break;
+ case END:
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void closeConnectedClients() {
+ PcepClient pc;
+ for (PccId id : connectedClients.keySet()) {
+ pc = getClient(id);
+ pc.disconnectClient();
+ }
+ }
+
+ /**
+ * Implementation of an Pcep Agent which is responsible for
+ * keeping track of connected clients and the state in which
+ * they are.
+ */
+ public class PcepClientAgent implements PcepAgent {
+
+ private final Logger log = LoggerFactory.getLogger(PcepClientAgent.class);
+
+ @Override
+ public boolean addConnectedClient(PccId pccId, PcepClient pc) {
+
+ if (connectedClients.get(pccId) != null) {
+ log.error("Trying to add connectedClient but found a previous "
+ + "value for pcc ip: {}", pccId.toString());
+ return false;
+ } else {
+ log.debug("Added Client {}", pccId.toString());
+ connectedClients.put(pccId, pc);
+ for (PcepClientListener l : pcepClientListener) {
+ l.clientConnected(pccId);
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public boolean validActivation(PccId pccId) {
+ if (connectedClients.get(pccId) == null) {
+ log.error("Trying to activate client but is not in "
+ + "connected client: pccIp {}. Aborting ..", pccId.toString());
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void removeConnectedClient(PccId pccId) {
+
+ connectedClients.remove(pccId);
+ for (PcepClientListener l : pcepClientListener) {
+ log.warn("removal for {}", pccId.toString());
+ l.clientDisconnected(pccId);
+ }
+ }
+
+ @Override
+ public void processPcepMessage(PccId pccId, PcepMessage m) {
+ processClientMessage(pccId, m);
+ }
+ }
+}
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepClientImpl.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepClientImpl.java
new file mode 100644
index 00000000..a10ff5c8
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepClientImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pcep.controller.impl;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.jboss.netty.channel.Channel;
+import org.onlab.packet.IpAddress;
+import org.onosproject.pcep.controller.PccId;
+import org.onosproject.pcep.controller.PcepPacketStats;
+import org.onosproject.pcep.controller.driver.PcepAgent;
+import org.onosproject.pcep.controller.driver.PcepClientDriver;
+import org.onosproject.pcepio.protocol.PcepFactories;
+import org.onosproject.pcepio.protocol.PcepFactory;
+import org.onosproject.pcepio.protocol.PcepMessage;
+import org.onosproject.pcepio.protocol.PcepVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * An abstract representation of an OpenFlow switch. Can be extended by others
+ * to serve as a base for their vendor specific representation of a switch.
+ */
+public class PcepClientImpl implements PcepClientDriver {
+
+ protected final Logger log = LoggerFactory.getLogger(PcepClientImpl.class);
+
+ private static final String SHUTDOWN_MSG = "Worker has already been shutdown";
+
+ private Channel channel;
+ protected String channelId;
+
+ private boolean connected;
+ protected boolean startDriverHandshakeCalled = false;
+ protected boolean isHandShakeComplete = false;
+ protected boolean isSyncComplete = false;
+ private PccId pccId;
+ private PcepAgent agent;
+
+ private PcepVersion pcepVersion;
+ private byte keepAliveTime;
+ private byte deadTime;
+ private byte sessionId;
+ private PcepPacketStatsImpl pktStats;
+
+ @Override
+ public void init(PccId pccId, PcepVersion pcepVersion, PcepPacketStats pktStats) {
+ this.pccId = pccId;
+ this.pcepVersion = pcepVersion;
+ this.pktStats = (PcepPacketStatsImpl) pktStats;
+ }
+
+ @Override
+ public final void disconnectClient() {
+ this.channel.close();
+ }
+
+ @Override
+ public final void sendMessage(PcepMessage m) {
+ log.debug("Sending message to {}", channel.getRemoteAddress());
+ try {
+ channel.write(Collections.singletonList(m));
+ this.pktStats.addOutPacket();
+ } catch (RejectedExecutionException e) {
+ log.warn(e.getMessage());
+ if (!e.getMessage().contains(SHUTDOWN_MSG)) {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public final void sendMessage(List<PcepMessage> msgs) {
+ try {
+ channel.write(msgs);
+ this.pktStats.addOutPacket(msgs.size());
+ } catch (RejectedExecutionException e) {
+ log.warn(e.getMessage());
+ if (!e.getMessage().contains(SHUTDOWN_MSG)) {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public final boolean isConnected() {
+ return this.connected;
+ }
+
+ @Override
+ public final void setConnected(boolean connected) {
+ this.connected = connected;
+ };
+
+ @Override
+ public final void setChannel(Channel channel) {
+ this.channel = channel;
+ final SocketAddress address = channel.getRemoteAddress();
+ if (address instanceof InetSocketAddress) {
+ final InetSocketAddress inetAddress = (InetSocketAddress) address;
+ final IpAddress ipAddress = IpAddress.valueOf(inetAddress.getAddress());
+ if (ipAddress.isIp4()) {
+ channelId = ipAddress.toString() + ':' + inetAddress.getPort();
+ } else {
+ channelId = '[' + ipAddress.toString() + "]:" + inetAddress.getPort();
+ }
+ }
+ };
+
+ @Override
+ public String channelId() {
+ return channelId;
+ }
+
+ @Override
+ public final PccId getPccId() {
+ return this.pccId;
+ }
+
+ @Override
+ public final String getStringId() {
+ return this.pccId.toString();
+ }
+
+ @Override
+ public final void setPcVersion(PcepVersion pcepVersion) {
+ this.pcepVersion = pcepVersion;
+ }
+
+ @Override
+ public void setPcKeepAliveTime(byte keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ @Override
+ public void setPcDeadTime(byte deadTime) {
+ this.deadTime = deadTime;
+ }
+
+ @Override
+ public void setPcSessionId(byte sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ @Override
+ public void setIsSyncComplete(boolean value) {
+ this.isSyncComplete = value;
+ }
+
+ @Override
+ public boolean isSyncComplete() {
+ return isSyncComplete;
+ }
+
+ @Override
+ public final void handleMessage(PcepMessage m) {
+ this.pktStats.addInPacket();
+ this.agent.processPcepMessage(pccId, m);
+ }
+
+ @Override
+ public final boolean connectClient() {
+ return this.agent.addConnectedClient(pccId, this);
+ }
+
+ @Override
+ public final void removeConnectedClient() {
+ this.agent.removeConnectedClient(pccId);
+ }
+
+ @Override
+ public PcepFactory factory() {
+ return PcepFactories.getFactory(pcepVersion);
+ }
+
+ @Override
+ public boolean isHandshakeComplete() {
+ return isHandShakeComplete;
+ }
+
+ @Override
+ public final void setAgent(PcepAgent ag) {
+ if (this.agent == null) {
+ this.agent = ag;
+ }
+ }
+
+ @Override
+ public boolean isOptical() {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("channel", channelId())
+ .add("pccId", getPccId())
+ .toString();
+ }
+}
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepMessageDecoder.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepMessageDecoder.java
new file mode 100644
index 00000000..b1065891
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepMessageDecoder.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.pcep.controller.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.onosproject.pcepio.protocol.PcepFactories;
+import org.onosproject.pcepio.protocol.PcepMessage;
+import org.onosproject.pcepio.protocol.PcepMessageReader;
+import org.onosproject.pcepio.util.HexDump;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decode an pcep message from a Channel, for use in a netty pipeline.
+ */
+public class PcepMessageDecoder extends FrameDecoder {
+
+ protected static final Logger log = LoggerFactory.getLogger(PcepMessageDecoder.class);
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buffer) throws Exception {
+ log.debug("Message received.");
+ if (!channel.isConnected()) {
+ log.info("Channel is not connected.");
+ // In testing, I see decode being called AFTER decode last.
+ // This check avoids that from reading corrupted frames
+ return null;
+ }
+
+ HexDump.pcepHexDump(buffer);
+
+ // Note that a single call to decode results in reading a single
+ // PcepMessage from the channel buffer, which is passed on to, and processed
+ // by, the controller (in PcepChannelHandler).
+ // This is different from earlier behavior (with the original pcepIO),
+ // where we parsed all the messages in the buffer, before passing on
+ // a list of the parsed messages to the controller.
+ // The performance *may or may not* not be as good as before.
+ PcepMessageReader<PcepMessage> reader = PcepFactories.getGenericReader();
+ List<PcepMessage> msgList = new LinkedList<>();
+
+ while (buffer.readableBytes() > 0) {
+ PcepMessage message = reader.readFrom(buffer);
+ msgList.add(message);
+ }
+ return msgList;
+ }
+}
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepMessageEncoder.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepMessageEncoder.java
new file mode 100644
index 00000000..ae97221e
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepMessageEncoder.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.pcep.controller.impl;
+
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import org.onosproject.pcepio.protocol.PcepMessage;
+import org.onosproject.pcepio.util.HexDump;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encode an pcep message for output into a ChannelBuffer, for use in a
+ * netty pipeline.
+ */
+public class PcepMessageEncoder extends OneToOneEncoder {
+ protected static final Logger log = LoggerFactory.getLogger(PcepMessageEncoder.class);
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+ log.debug("Sending message");
+ if (!(msg instanceof List)) {
+ log.debug("Invalid msg.");
+ return msg;
+ }
+
+ @SuppressWarnings("unchecked")
+ List<PcepMessage> msglist = (List<PcepMessage>) msg;
+
+ ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
+
+ for (PcepMessage pm : msglist) {
+ pm.writeTo(buf);
+ }
+
+ HexDump.pcepHexDump(buf);
+
+ return buf;
+ }
+}
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepPacketStatsImpl.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepPacketStatsImpl.java
new file mode 100644
index 00000000..f2bc51eb
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepPacketStatsImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.pcep.controller.impl;
+
+import org.onosproject.pcep.controller.PcepPacketStats;
+
+/**
+ * The implementation for PCEP packet statistics.
+ */
+public class PcepPacketStatsImpl implements PcepPacketStats {
+
+ private int inPacketCount;
+ private int outPacketCount;
+ private int wrongPacketCount;
+ private long time;
+
+ /**
+ * Default constructor.
+ */
+ public PcepPacketStatsImpl() {
+ this.inPacketCount = 0;
+ this.outPacketCount = 0;
+ this.wrongPacketCount = 0;
+ this.time = 0;
+ }
+
+ @Override
+ public int outPacketCount() {
+ return outPacketCount;
+ }
+
+ @Override
+ public int inPacketCount() {
+ return inPacketCount;
+ }
+
+ @Override
+ public int wrongPacketCount() {
+ return wrongPacketCount;
+ }
+
+ /**
+ * Increments the received packet counter.
+ */
+ public void addInPacket() {
+ this.inPacketCount++;
+ }
+
+ /**
+ * Increments the sent packet counter.
+ */
+ public void addOutPacket() {
+ this.outPacketCount++;
+ }
+
+ /**
+ * Increments the sent packet counter by specified value.
+ *
+ * @param value of no of packets sent
+ */
+ public void addOutPacket(int value) {
+ this.outPacketCount = this.outPacketCount + value;
+ }
+
+ /**
+ * Increments the wrong packet counter.
+ */
+ public void addWrongPacket() {
+ this.wrongPacketCount++;
+ }
+
+ /**
+ * Resets wrong packet count.
+ */
+ public void resetWrongPacket() {
+ this.wrongPacketCount = 0;
+ }
+
+ @Override
+ public long getTime() {
+ return this.time;
+ }
+
+ /**
+ * Sets the time value.
+ *
+ * @param time long value of time
+ */
+ public void setTime(long time) {
+ this.time = time;
+ }
+}
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepPipelineFactory.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepPipelineFactory.java
new file mode 100644
index 00000000..f32b87a8
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/PcepPipelineFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pcep.controller.impl;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.timeout.IdleStateHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.ExternalResourceReleasable;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
+
+/**
+ * Creates a ChannelPipeline for a server-side pcep channel.
+ */
+public class PcepPipelineFactory
+ implements ChannelPipelineFactory, ExternalResourceReleasable {
+
+ protected Controller controller;
+ static final Timer TIMER = new HashedWheelTimer();
+ protected IdleStateHandler idleHandler;
+ protected ReadTimeoutHandler readTimeoutHandler;
+ static final int DEFAULT_KEEP_ALIVE_TIME = 30;
+ static final int DEFAULT_DEAD_TIME = 120;
+ static final int DEFAULT_WAIT_TIME = 60;
+
+ public PcepPipelineFactory(Controller controller) {
+ super();
+ this.controller = controller;
+ this.idleHandler = new IdleStateHandler(TIMER, DEFAULT_DEAD_TIME, DEFAULT_KEEP_ALIVE_TIME, 0);
+ this.readTimeoutHandler = new ReadTimeoutHandler(TIMER, DEFAULT_WAIT_TIME);
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ PcepChannelHandler handler = new PcepChannelHandler(controller);
+
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("pcepmessagedecoder", new PcepMessageDecoder());
+ pipeline.addLast("pcepmessageencoder", new PcepMessageEncoder());
+ pipeline.addLast("idle", idleHandler);
+ pipeline.addLast("waittimeout", readTimeoutHandler);
+ pipeline.addLast("handler", handler);
+ return pipeline;
+ }
+
+ @Override
+ public void releaseExternalResources() {
+ TIMER.stop();
+ }
+}
diff --git a/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/package-info.java b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/package-info.java
new file mode 100644
index 00000000..d86eefc3
--- /dev/null
+++ b/framework/src/onos/protocols/pcep/ctl/src/main/java/org/onosproject/pcep/controller/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the PCEP client controller subsystem.
+ */
+package org.onosproject.pcep.controller.impl;