Add TLS support in the Opendaylight Controller: 19/119/1
authorJason Ye <yisye@cisco.com>
Fri, 5 Apr 2013 23:58:47 +0000 (16:58 -0700)
committerJason Ye <yisye@cisco.com>
Fri, 5 Apr 2013 23:58:47 +0000 (16:58 -0700)
- TLS configuration is specified in file ./configuration/config.ini. To enable the TLS feature, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files. The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate entries, including switches' Certification Authority (CA) certificates. Here is the sample configuration,

secureChannelEnabled=true
controllerKeyStore=./configuration/ctlKeyStore
controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation)
controllerTrustStore=./configuration/ctlTrustStore
controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation)

- Added two message read/write services, one for clear text, one for secure communication.

- Added priority queue for message transmission. The system critical messages, such as Hello, Echo Reply etc will be treated as high priority and will be served ahead of other messages, like statistics request, discovery packets etc.

Signed-off-by: Jason Ye <yisye@cisco.com>
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/distribution/opendaylight/src/main/resources/configuration/logback.xml
opendaylight/protocol_plugins/openflow/pom.xml
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java

index 1b90fc7..5cababb 100644 (file)
@@ -48,3 +48,19 @@ org.eclipse.gemini.web.tomcat.config.path=configuration/tomcat-server.xml
 # of.listenPort=6633
 # The time (in milliseconds) the controller will wait for a response after sending a Barrier Request or a Statistic Request message (default 2000 msec)
 # of.messageResponseTimer=2000
+
+# TLS configuration
+# To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
+# The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate 
+# entries, including switches' Certification Authority (CA) certificates. For example,
+# secureChannelEnabled=true
+# controllerKeyStore=./configuration/ctlKeyStore
+# controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation)
+# controllerTrustStore=./configuration/ctlTrustStore
+# controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation)
+
+secureChannelEnabled=false
+controllerKeyStore=
+controllerKeyStorePassword=
+controllerTrustStore=
+controllerTrustStorePassword=
index 3c3f788..a99d17d 100644 (file)
@@ -36,6 +36,9 @@
   <logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.InventoryServiceShim" level="INFO"/>
   <logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.TopologyServices" level="INFO"/>
   <logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.TopologyServiceShim" level="INFO"/>
+  <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.Controller" level="INFO"/>
+  <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchHandler" level="INFO"/>
+  <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchIOSecureService" level="INFO"/>
   <!-- SAL  -->
   <logger name="org.opendaylight.controller.sal" level="INFO"/>
   <logger name="org.opendaylight.controller.sal.implementation" level="INFO"/>
index 418c897..614b0ff 100644 (file)
@@ -38,7 +38,8 @@
               org.apache.felix.dm,
               org.slf4j,
               org.eclipse.osgi.framework.console,
-                         org.osgi.framework
+                         org.osgi.framework,
+                         javax.net.ssl
             </Import-Package>
             <Export-Package>
                          org.opendaylight.controller.protocol_plugin.openflow.internal
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java
new file mode 100644 (file)
index 0000000..301159d
--- /dev/null
@@ -0,0 +1,45 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core;
+
+import java.util.List;
+
+import org.openflow.protocol.OFMessage;
+
+/**
+ * This interface defines low level routines to read/write messages on an open
+ * socket channel. If secure communication is desired, these methods also perform
+ * encryption and decryption of the network data.
+ */
+public interface IMessageReadWrite {
+       /**
+        * Sends the OF message out over the socket channel. For secure
+        * communication, the data will be encrypted.
+        * 
+        * @param msg OF message to be sent
+        * @throws Exception
+        */
+       public void asyncSend(OFMessage msg) throws Exception;
+
+       /**
+        * Resumes sending the remaining messages in the outgoing buffer
+        * @throws Exception
+        */
+       public void resumeSend() throws Exception;
+
+       /**
+        * Reads the incoming network data from the socket and retrieves the OF
+        * messages. For secure communication, the data will be decrypted first.
+        * 
+        * @return list of OF messages
+        * @throws Exception
+        */
+    public List<OFMessage> readMessages() throws Exception;
+}
index 0a4560e..a15c2e5 100644 (file)
@@ -66,20 +66,52 @@ public interface ISwitch {
        public Date getConnectedDate();
 
        /**
-        * Sends the message. A unique XID is generated automatically and inserted into the message.
-        * @param msg TheOF message to be sent
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. An unique XID is generated automatically and
+        * inserted into the message.
+        * 
+        * @param msg The OF message to be sent
         * @return The XID used
         */
        public Integer asyncSend(OFMessage msg);
 
        /**
-        * Sends the message with the specified XID.
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. The specified XID is inserted into the message.
+        * 
         * @param msg The OF message to be Sent
         * @param xid The XID to be used in the message
         * @return The XID used
         */
        public Integer asyncSend(OFMessage msg, int xid);
 
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. An unique XID is generated automatically and inserted into the
+        * message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
+       public Integer asyncFastSend(OFMessage msg);
+
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. The specified XID is inserted into the message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
+       public Integer asyncFastSend(OFMessage msg, int xid);
+
        /**
         * Sends the OF message followed by a Barrier Request with a unique XID which is automatically generated,
         * and waits for a result from the switch.
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java
new file mode 100644 (file)
index 0000000..d20bf1e
--- /dev/null
@@ -0,0 +1,142 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements methods to read/write messages over an established
+ * socket channel. The data exchange is in clear text format.
+ */
+public class MessageReadWriteService implements IMessageReadWrite {
+    private static final Logger logger = LoggerFactory
+            .getLogger(MessageReadWriteService.class);
+    private static final int bufferSize = 1024 * 1024;
+
+    private Selector selector;
+    private SelectionKey clientSelectionKey;
+    private SocketChannel socket;
+    private ByteBuffer inBuffer;
+    private ByteBuffer outBuffer;
+    private BasicFactory factory;
+
+    public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
+       this.socket = socket;
+       this.selector = selector;
+       this.factory = new BasicFactory();
+       this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
+       this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
+       this.clientSelectionKey = this.socket.register(this.selector,
+                       SelectionKey.OP_READ);
+    }
+
+       /**
+        * Sends the OF message out over the socket channel.
+        * 
+        * @param msg OF message to be sent
+        * @throws Exception
+        */
+    @Override
+    public void asyncSend(OFMessage msg) throws IOException {
+       synchronized (outBuffer) {
+               int msgLen = msg.getLengthU();
+               if (outBuffer.remaining() < msgLen) {
+                       // increase the buffer size so that it can contain this message
+                       ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
+                                       .capacity()
+                                       + msgLen);
+                       outBuffer.flip();
+                       newBuffer.put(outBuffer);
+                       outBuffer = newBuffer;
+               }
+               msg.writeTo(outBuffer);
+
+               if (!socket.isOpen()) {
+                       return;
+               }
+
+               outBuffer.flip();
+               socket.write(outBuffer);
+               outBuffer.compact();
+               if (outBuffer.position() > 0) {
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_WRITE, this);
+               }
+               logger.trace("Message sent: {}", msg.toString());
+       }
+    }
+
+       /**
+        * Resumes sending the remaining messages in the outgoing buffer
+        * @throws Exception
+        */
+    @Override
+    public void resumeSend() throws IOException {
+               synchronized (outBuffer) {
+                       if (!socket.isOpen()) {
+                               return;
+                       }
+
+               outBuffer.flip();
+               socket.write(outBuffer);
+               outBuffer.compact();
+               if (outBuffer.position() > 0) {
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_WRITE, this);
+               } else {
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_READ, this);
+               }
+        }
+    }
+
+       /**
+        * Reads the incoming network data from the socket and retrieves the OF
+        * messages.
+        * 
+        * @return list of OF messages
+        * @throws Exception
+        */
+    @Override
+    public List<OFMessage> readMessages() throws IOException {
+               if (!socket.isOpen()) {
+                       return null;
+               }
+
+               List<OFMessage> msgs = null;
+        int bytesRead = -1;        
+        bytesRead = socket.read(inBuffer);
+        if (bytesRead < 0) {
+                       throw new AsynchronousCloseException();
+        }
+
+        inBuffer.flip();
+        msgs = factory.parseMessages(inBuffer);
+        if (inBuffer.hasRemaining()) {
+            inBuffer.compact();
+        } else {
+            inBuffer.clear();
+        }
+        return msgs;
+    }
+}
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java
new file mode 100644 (file)
index 0000000..6bc4f10
--- /dev/null
@@ -0,0 +1,59 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.openflow.protocol.OFMessage;
+
+/**
+ * This class describes an OpenFlow message with priority
+ */
+class PriorityMessage {
+       OFMessage msg;
+       int priority;
+       
+       public PriorityMessage(OFMessage msg, int priority) {
+               this.msg = msg;
+               this.priority = priority;
+       }
+
+       public OFMessage getMsg() {
+               return msg;
+       }
+
+       public void setMsg(OFMessage msg) {
+               this.msg = msg;
+       }
+
+       public int getPriority() {
+               return priority;
+       }
+
+       public void setPriority(int priority) {
+               this.priority = priority;
+       }       
+       
+       @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(this);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return EqualsBuilder.reflectionEquals(this, obj);
+    }
+
+    @Override
+    public String toString() {
+        return "PriorityMessage[" + ReflectionToStringBuilder.toString(this) + "]";
+    }
+}
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java
new file mode 100644 (file)
index 0000000..627e223
--- /dev/null
@@ -0,0 +1,346 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.List;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements methods to read/write messages over an established
+ * socket channel. The data exchange is encrypted/decrypted by SSLEngine.
+ */
+public class SecureMessageReadWriteService implements IMessageReadWrite {
+    private static final Logger logger = LoggerFactory
+            .getLogger(SecureMessageReadWriteService.class);
+
+    private Selector selector;
+    private SelectionKey clientSelectionKey;
+    private SocketChannel socket;
+    private BasicFactory factory;
+
+    private SSLEngine sslEngine;
+       private SSLEngineResult sslEngineResult;        // results from sslEngine last operation
+    private ByteBuffer myAppData;                              // clear text message to be sent
+    private ByteBuffer myNetData;                      // encrypted message to be sent
+    private ByteBuffer peerAppData;                            // clear text message received from the switch
+    private ByteBuffer peerNetData;                    // encrypted message from the switch
+
+    public SecureMessageReadWriteService(SocketChannel socket, Selector selector) throws Exception {
+       this.socket = socket;
+       this.selector = selector;
+       this.factory = new BasicFactory();
+
+       createSecureChannel(socket);
+       createBuffers(sslEngine);
+    }
+
+       /**
+        * Bring up secure channel using SSL Engine
+        * 
+        * @param socket TCP socket channel
+        * @throws Exception
+        */
+    private void createSecureChannel(SocketChannel socket) throws Exception {
+       String keyStoreFile = System.getProperty("controllerKeyStore");
+       String keyStorePassword = System.getProperty("controllerKeyStorePassword");
+       String trustStoreFile = System.getProperty("controllerTrustStore");;
+       String trustStorePassword = System.getProperty("controllerTrustStorePassword");;
+
+        KeyStore ks = KeyStore.getInstance("JKS");
+        KeyStore ts = KeyStore.getInstance("JKS");
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+        ks.load(new FileInputStream(keyStoreFile), keyStorePassword.toCharArray());
+        ts.load(new FileInputStream(trustStoreFile), trustStorePassword.toCharArray());
+        kmf.init(ks, keyStorePassword.toCharArray());
+        tmf.init(ts);
+
+        SecureRandom random = new SecureRandom();
+        random.nextInt();
+
+       SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), random);
+       sslEngine = sslContext.createSSLEngine();
+       sslEngine.setUseClientMode(false);
+       sslEngine.setNeedClientAuth(true);
+       
+       // Do initial handshake
+       doHandshake(socket, sslEngine);
+       
+        this.clientSelectionKey = this.socket.register(this.selector,
+                SelectionKey.OP_READ);
+    }
+
+       /**
+        * Sends the OF message out over the socket channel. The message is
+        * encrypted by SSL Engine.
+        * 
+        * @param msg OF message to be sent
+        * @throws Exception
+        */
+    @Override
+    public void asyncSend(OFMessage msg) throws Exception {
+       synchronized (myAppData) {
+               int msgLen = msg.getLengthU();
+               if (myAppData.remaining() < msgLen) {
+                       // increase the buffer size so that it can contain this message
+                       ByteBuffer newBuffer = ByteBuffer.allocateDirect(myAppData
+                                       .capacity()
+                                       + msgLen);
+                       myAppData.flip();
+                       newBuffer.put(myAppData);
+                       myAppData = newBuffer;
+               }
+               msg.writeTo(myAppData);
+               myAppData.flip();
+               sslEngineResult = sslEngine.wrap(myAppData, myNetData);
+               logger.trace("asyncSend sslEngine wrap: {}", sslEngineResult);
+               runDelegatedTasks(sslEngineResult, sslEngine);
+
+               if (!socket.isOpen()) {
+                       return;
+               }
+
+               myNetData.flip();
+               socket.write(myNetData);
+               if (myNetData.hasRemaining()) {
+                       myNetData.compact();
+               } else {
+                       myNetData.clear();
+               }
+
+               if (myAppData.hasRemaining()) {
+                       myAppData.compact();
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_WRITE, this);
+               } else {
+                       myAppData.clear();
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_READ, this);
+               }
+
+               logger.trace("Message sent: {}", msg.toString());
+       }
+    }
+
+       /**
+        * Resumes sending the remaining messages in the outgoing buffer
+        * @throws Exception
+        */
+    @Override
+    public void resumeSend() throws Exception {
+               synchronized (myAppData) {
+                       myAppData.flip();
+                       sslEngineResult = sslEngine.wrap(myAppData, myNetData);
+                       logger.trace("resumeSend sslEngine wrap: {}", sslEngineResult);
+                       runDelegatedTasks(sslEngineResult, sslEngine);
+
+                       if (!socket.isOpen()) {
+                               return;
+                       }
+
+                       myNetData.flip();
+                       socket.write(myNetData);
+                       if (myNetData.hasRemaining()) {
+                               myNetData.compact();
+                       } else {
+                               myNetData.clear();
+                       }
+
+                       if (myAppData.hasRemaining()) {
+                               myAppData.compact();
+                               this.clientSelectionKey = this.socket.register(this.selector,
+                                               SelectionKey.OP_WRITE, this);
+                       } else {
+                               myAppData.clear();
+                               this.clientSelectionKey = this.socket.register(this.selector,
+                                               SelectionKey.OP_READ, this);
+                       }
+               }
+    }
+
+       /**
+        * Reads the incoming network data from the socket, decryptes them and then
+        * retrieves the OF messages.
+        * 
+        * @return list of OF messages
+        * @throws Exception
+        */
+    @Override
+    public List<OFMessage> readMessages() throws Exception {
+               if (!socket.isOpen()) {
+                       return null;
+               }
+
+               List<OFMessage> msgs = null;
+        int bytesRead = -1;
+       int countDown = 50;             
+
+       bytesRead = socket.read(peerNetData);
+       if (bytesRead < 0) {
+                       throw new AsynchronousCloseException();
+       }
+
+       do {                    
+               peerNetData.flip();
+               sslEngineResult = sslEngine.unwrap(peerNetData, peerAppData);
+               if (peerNetData.hasRemaining()) {
+                       peerNetData.compact();
+               } else {
+                       peerNetData.clear();
+               }
+               logger.trace("sslEngine unwrap result: {}", sslEngineResult);
+               runDelegatedTasks(sslEngineResult, sslEngine);
+       } while ((sslEngineResult.getStatus() == SSLEngineResult.Status.OK) &&
+                         peerNetData.hasRemaining() && (--countDown > 0));
+       
+       if (countDown == 0) {
+               logger.trace("countDown reaches 0. peerNetData pos {} lim {}", peerNetData.position(), peerNetData.limit());
+       }
+
+       peerAppData.flip();
+       msgs = factory.parseMessages(peerAppData);
+       if (peerAppData.hasRemaining()) {
+               peerAppData.compact();
+       } else {
+               peerAppData.clear();
+       }
+
+       this.clientSelectionKey = this.socket.register(
+                       this.selector, SelectionKey.OP_READ, this);
+        
+        return msgs;
+    }
+
+    /**
+     *  If the result indicates that we have outstanding tasks to do,
+     *  go ahead and run them in this thread.
+     */
+    private void runDelegatedTasks(SSLEngineResult result,
+               SSLEngine engine) throws Exception {
+
+       if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+               Runnable runnable;
+               while ((runnable = engine.getDelegatedTask()) != null) {
+                       logger.debug("\trunning delegated task...");
+                       runnable.run();
+               }
+               HandshakeStatus hsStatus = engine.getHandshakeStatus();
+               if (hsStatus == HandshakeStatus.NEED_TASK) {
+                       throw new Exception(
+                                       "handshake shouldn't need additional tasks");
+               }
+               logger.debug("\tnew HandshakeStatus: {}", hsStatus);
+       }
+    }
+
+    private void doHandshake(SocketChannel socket, SSLEngine engine) throws Exception {
+       SSLSession session = engine.getSession();
+       ByteBuffer myAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+       ByteBuffer peerAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+       ByteBuffer myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+       ByteBuffer peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+
+       // Begin handshake
+       engine.beginHandshake();
+       SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus();
+
+       // Process handshaking message
+       while (hs != SSLEngineResult.HandshakeStatus.FINISHED &&
+                  hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+               switch (hs) {
+               case NEED_UNWRAP:
+                       // Receive handshaking data from peer
+                       if (socket.read(peerNetData) < 0) {
+                               throw new AsynchronousCloseException();
+                       }
+
+                       // Process incoming handshaking data
+                       peerNetData.flip();
+                       SSLEngineResult res = engine.unwrap(peerNetData, peerAppData);
+                       peerNetData.compact();
+                       hs = res.getHandshakeStatus();
+
+                       // Check status
+                       switch (res.getStatus()) {
+                       case OK :
+                               // Handle OK status
+                               break;
+                       }
+                       break;
+
+               case NEED_WRAP :
+                       // Empty the local network packet buffer.
+                       myNetData.clear();
+
+                       // Generate handshaking data
+                       res = engine.wrap(myAppData, myNetData);
+                       hs = res.getHandshakeStatus();
+
+                       // Check status
+                       switch (res.getStatus()) {
+                       case OK :
+                               myNetData.flip();
+
+                               // Send the handshaking data to peer
+                               while (myNetData.hasRemaining()) {
+                                       if (socket.write(myNetData) < 0) {
+                                       throw new AsynchronousCloseException();
+                                       }
+                               }
+                               break;
+                       }
+                       break;
+
+               case NEED_TASK :
+                       // Handle blocking tasks
+                       Runnable runnable;
+                       while ((runnable = engine.getDelegatedTask()) != null) {
+                               logger.debug("\trunning delegated task...");
+                               runnable.run();
+                       }
+                       hs = engine.getHandshakeStatus();
+                       if (hs == HandshakeStatus.NEED_TASK) {
+                               throw new Exception(
+                                               "handshake shouldn't need additional tasks");
+                       }
+                       logger.debug("\tnew HandshakeStatus: {}", hs);
+                       break;
+               }
+       }
+    }
+    
+    private void createBuffers(SSLEngine engine) {
+       SSLSession session = engine.getSession();
+       this.myAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+       this.peerAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+       this.myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+       this.peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+    }
+}
index 8881fb5..4520375 100644 (file)
@@ -9,12 +9,13 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 
-import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -28,11 +29,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
 import org.openflow.protocol.OFBarrierReply;
 import org.openflow.protocol.OFEchoReply;
 import org.openflow.protocol.OFError;
@@ -63,7 +66,6 @@ public class SwitchHandler implements ISwitch {
     private static final int SWITCH_LIVENESS_TIMER = 5000;
     private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
     private int MESSAGE_RESPONSE_TIMER = 2000;
-    private static final int bufferSize = 1024 * 1024;
 
     private String instanceName;
     private ISwitch thisISwitch;
@@ -74,10 +76,7 @@ public class SwitchHandler implements ISwitch {
     private Byte tables;
     private Integer actions;
     private Selector selector;
-    private SelectionKey clientSelectionKey;
     private SocketChannel socket;
-    private ByteBuffer inBuffer;
-    private ByteBuffer outBuffer;
     private BasicFactory factory;
     private AtomicInteger xid;
     private SwitchState state;
@@ -90,9 +89,12 @@ public class SwitchHandler implements ISwitch {
     private ExecutorService executor;
     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
     private boolean running;
+    private IMessageReadWrite msgReadWriteService;
     private Thread switchHandlerThread;
     private Integer responseTimerValue;
-
+       private PriorityBlockingQueue<PriorityMessage> transmitQ;
+    private Thread transmitThread;
+    
     private enum SwitchState {
         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
                 3);
@@ -130,44 +132,37 @@ public class SwitchHandler implements ISwitch {
         this.periodicTimer = null;
         this.executor = Executors.newFixedThreadPool(4);
         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
-        this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
-        this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
         String rTimer = System.getProperty("of.messageResponseTimer");
         if (rTimer != null) {
-            try {
-                responseTimerValue = Integer.decode(rTimer);
-            } catch (NumberFormatException e) {
-                logger.warn("Invalid of.messageResponseTimer:" + rTimer + ", use default("
-                        + MESSAGE_RESPONSE_TIMER+ ")");
-            }
+               try {
+                       responseTimerValue = Integer.decode(rTimer);
+               } catch (NumberFormatException e) {
+                               logger.warn("Invalid of.messageResponseTimer: {} use default({})",
+                                               rTimer, MESSAGE_RESPONSE_TIMER);
+               }
         }
-    }
+       }
 
     public void start() {
         try {
-            this.selector = SelectorProvider.provider().openSelector();
-            this.socket.configureBlocking(false);
-            this.socket.socket().setTcpNoDelay(true);
-            this.clientSelectionKey = this.socket.register(this.selector,
-                    SelectionKey.OP_READ);
+               startTransmitThread();
+               setupCommChannel();
+               sendFirstHello();
             startHandlerThread();
         } catch (Exception e) {
             reportError(e);
-            return;
         }
     }
 
     private void startHandlerThread() {
-        OFMessage msg = factory.getMessage(OFType.HELLO);
-        asyncSend(msg);
         switchHandlerThread = new Thread(new Runnable() {
             @Override
             public void run() {
                 running = true;
                 while (running) {
                     try {
-                        // wait for an incoming connection
+                       // wait for an incoming connection
                         selector.select(0);
                         Iterator<SelectionKey> selectedKeys = selector
                                 .selectedKeys().iterator();
@@ -195,7 +190,7 @@ public class SwitchHandler implements ISwitch {
             running = false;
             selector.wakeup();
             cancelSwitchTimer();
-            this.clientSelectionKey.cancel();
+            this.selector.close();
             this.socket.close();
             executor.shutdown();
         } catch (Exception e) {
@@ -209,77 +204,94 @@ public class SwitchHandler implements ISwitch {
         return this.xid.incrementAndGet();
     }
 
+       /**
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. An unique XID is generated automatically and
+        * inserted into the message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
     @Override
     public Integer asyncSend(OFMessage msg) {
-        return asyncSend(msg, getNextXid());
+       return asyncSend(msg, getNextXid());
+    }
+
+       /**
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. The specified XID is inserted into the message.
+        * 
+        * @param msg The OF message to be Sent
+        * @param xid The XID to be used in the message
+        * @return The XID used
+        */
+    @Override
+    public Integer asyncSend(OFMessage msg, int xid) {
+       msg.setXid(xid);
+       transmitQ.add(new PriorityMessage(msg, 0));
+        return xid;
     }
 
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. An unique XID is generated automatically and inserted into the
+        * message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
     @Override
-    public Integer asyncSend(OFMessage msg, int xid) {
-        synchronized (outBuffer) {
-            /*
-            if ((msg.getType() != OFType.ECHO_REQUEST) &&
-                       (msg.getType() != OFType.ECHO_REPLY)) {
-               logger.debug("sending " + msg.getType().toString() + " to " + toString());
-            }
-             */
-            msg.setXid(xid);
-            int msgLen = msg.getLengthU();
-            if (outBuffer.remaining() < msgLen) {
-                // increase the buffer size so that it can contain this message
-                ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
-                        .capacity()
-                        + msgLen);
-                outBuffer.flip();
-                newBuffer.put(outBuffer);
-                outBuffer = newBuffer;
-            }
-            msg.writeTo(outBuffer);
-            outBuffer.flip();
-            try {
-                socket.write(outBuffer);
-                outBuffer.compact();
-                if (outBuffer.position() > 0) {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_WRITE, this);
-                }
-                logger.trace("Message sent: " + msg.toString());
-            } catch (Exception e) {
-                reportError(e);
-            }
-        }
+    public Integer asyncFastSend(OFMessage msg) {
+       return asyncFastSend(msg, getNextXid());
+    }
+
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. The specified XID is inserted into the message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
+    @Override
+    public Integer asyncFastSend(OFMessage msg, int xid) {
+       msg.setXid(xid);
+       transmitQ.add(new PriorityMessage(msg, 1));
         return xid;
     }
 
-    public void resumeSend() {
-        synchronized (outBuffer) {
-            try {
-                outBuffer.flip();
-                socket.write(outBuffer);
-                outBuffer.compact();
-                if (outBuffer.position() > 0) {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_WRITE, this);
-                } else {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_READ, this);
-                }
-            } catch (Exception e) {
-                reportError(e);
-            }
-        }
+   public void resumeSend() {
+        try {
+                       msgReadWriteService.resumeSend();
+               } catch (Exception e) {
+                       reportError(e);
+               }
     }
 
     public void handleMessages() {
-        List<OFMessage> msgs = readMessages();
+        List<OFMessage> msgs = null;
+        
+        try {
+               msgs = msgReadWriteService.readMessages();
+               } catch (Exception e) {
+                       reportError(e);
+               }
+               
         if (msgs == null) {
-            logger.debug(toString() + " is down");
+            logger.debug("{} is down", toString());
             // the connection is down, inform core
             reportSwitchStateChange(false);
             return;
         }
         for (OFMessage msg : msgs) {
-            logger.trace("Message received: " + msg.toString());
+            logger.trace("Message received: {}", msg.toString());
             /*
             if  ((msg.getType() != OFType.ECHO_REQUEST) &&
                        (msg.getType() != OFType.ECHO_REPLY)) {
@@ -293,7 +305,7 @@ public class SwitchHandler implements ISwitch {
                 // send feature request
                 OFMessage featureRequest = factory
                         .getMessage(OFType.FEATURES_REQUEST);
-                asyncSend(featureRequest);
+                asyncFastSend(featureRequest);
                 // delete all pre-existing flows
                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
                 OFFlowMod flowMod = (OFFlowMod) factory
@@ -301,14 +313,14 @@ public class SwitchHandler implements ISwitch {
                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
                         .setOutPort(OFPort.OFPP_NONE).setLength(
                                 (short) OFFlowMod.MINIMUM_LENGTH);
-                asyncSend(flowMod);
+                asyncFastSend(flowMod);
                 this.state = SwitchState.WAIT_FEATURES_REPLY;
                 startSwitchTimer();
                 break;
             case ECHO_REQUEST:
                 OFEchoReply echoReply = (OFEchoReply) factory
                         .getMessage(OFType.ECHO_REPLY);
-                asyncSend(echoReply);
+                asyncFastSend(echoReply);
                 break;
             case ECHO_REPLY:
                 this.probeSent = false;
@@ -362,28 +374,6 @@ public class SwitchHandler implements ISwitch {
 
     }
 
-    private List<OFMessage> readMessages() {
-        List<OFMessage> msgs = null;
-        int bytesRead;
-        try {
-            bytesRead = socket.read(inBuffer);
-        } catch (Exception e) {
-            reportError(e);
-            return null;
-        }
-        if (bytesRead == -1) {
-            return null;
-        }
-        inBuffer.flip();
-        msgs = factory.parseMessages(inBuffer);
-        if (inBuffer.hasRemaining()) {
-            inBuffer.compact();
-        } else {
-            inBuffer.clear();
-        }
-        return msgs;
-    }
-
     private void startSwitchTimer() {
         this.periodicTimer = new Timer();
         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
@@ -394,8 +384,7 @@ public class SwitchHandler implements ISwitch {
                     if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
                         if (probeSent) {
                             // switch failed to respond to our probe, consider it down
-                            logger.warn(toString()
-                                    + " is idle for too long, disconnect");
+                            logger.warn("{} is idle for too long, disconnect", toString());
                             reportSwitchStateChange(false);
                         } else {
                             // send a probe to see if the switch is still alive
@@ -403,14 +392,14 @@ public class SwitchHandler implements ISwitch {
                             probeSent = true;
                             OFMessage echo = factory
                                     .getMessage(OFType.ECHO_REQUEST);
-                            asyncSend(echo);
+                            asyncFastSend(echo);
                         }
                     } else {
                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
                             // send another features request
                             OFMessage request = factory
                                     .getMessage(OFType.FEATURES_REQUEST);
-                            asyncSend(request);
+                            asyncFastSend(request);
                         } else {
                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
                                 //  send another config request
@@ -418,10 +407,10 @@ public class SwitchHandler implements ISwitch {
                                         .getMessage(OFType.SET_CONFIG);
                                 config.setMissSendLength((short) 0xffff)
                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
-                                asyncSend(config);
+                                asyncFastSend(config);
                                 OFMessage getConfig = factory
                                         .getMessage(OFType.GET_CONFIG_REQUEST);
-                                asyncSend(getConfig);
+                                asyncFastSend(getConfig);
                             }
                         }
                     }
@@ -439,8 +428,8 @@ public class SwitchHandler implements ISwitch {
     }
 
     private void reportError(Exception e) {
-        //logger.error(toString() + " caught Error " + e.toString());
-        // notify core of this error event
+        logger.debug("Caught exception ", e);
+        // notify core of this error event and disconnect the switch
         ((Controller) core).takeSwitchEventError(this);
     }
 
@@ -473,10 +462,10 @@ public class SwitchHandler implements ISwitch {
                     .getMessage(OFType.SET_CONFIG);
             config.setMissSendLength((short) 0xffff).setLengthU(
                     OFSetConfig.MINIMUM_LENGTH);
-            asyncSend(config);
+            asyncFastSend(config);
             // send config request to make sure the switch can handle the set config
             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
-            asyncSend(getConfig);
+            asyncFastSend(getConfig);
             this.state = SwitchState.WAIT_CONFIG_REPLY;
             // inform core that a new switch is now operational
             reportSwitchStateChange(true);
@@ -545,8 +534,7 @@ public class SwitchHandler implements ISwitch {
                     .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for " + req.getType()
-                    + " replies");
+            logger.warn("Timeout while waiting for {} replies", req.getType());
             result = null; // to indicate timeout has occurred
             return result;
         }
@@ -573,13 +561,12 @@ public class SwitchHandler implements ISwitch {
             } else {
                 // if result  is not null, this means the switch can't handle this message
                 // the result if OFError already
-                logger.debug("Send " + msg.getType().toString()
-                        + " failed --> " + ((OFError) result).toString());
+                logger.debug("Send {} failed --> {}", 
+                               msg.getType().toString(), ((OFError) result).toString());
             }
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for " + msg.getType().toString()
-                    + " reply");
+            logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
             // convert the result into a Boolean with value false
             status = false;
             result = status;
@@ -637,7 +624,7 @@ public class SwitchHandler implements ISwitch {
             worker.wakeup();
         }
     }
-
+    
     @Override
     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
         return this.physicalPorts;
@@ -715,4 +702,65 @@ public class SwitchHandler implements ISwitch {
         }
         return result;
     }
+
+       /*
+        * Transmit thread polls the message out of the priority queue and invokes
+        * messaging service to transmit it over the socket channel
+        */
+    class PriorityMessageTransmit implements Runnable {
+        public void run() {
+            while (true) {
+               try {
+                       if (!transmitQ.isEmpty()) {
+                               PriorityMessage pmsg = transmitQ.poll();
+                               msgReadWriteService.asyncSend(pmsg.msg);
+                               logger.trace("Message sent: {}", pmsg.toString());
+                       }
+                       Thread.sleep(10);
+               } catch (Exception e) {
+                       reportError(e);
+               }
+            }
+        }
+    }
+
+    /*
+     * Setup and start the transmit thread
+     */
+    private void startTransmitThread() {       
+        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, 
+                               new Comparator<PriorityMessage>() {
+                                       public int compare(PriorityMessage p1, PriorityMessage p2) {
+                                               return p2.priority - p1.priority;
+                                       }
+                               });
+        this.transmitThread = new Thread(new PriorityMessageTransmit());
+        this.transmitThread.start();
+    }
+    
+    /*
+     * Setup communication services
+     */
+    private void setupCommChannel() throws Exception {
+        this.selector = SelectorProvider.provider().openSelector();
+        this.socket.configureBlocking(false);
+        this.socket.socket().setTcpNoDelay(true);        
+        this.msgReadWriteService = getMessageReadWriteService();
+    }
+
+    private void sendFirstHello() {
+       try {
+               OFMessage msg = factory.getMessage(OFType.HELLO);
+               asyncFastSend(msg);
+       } catch (Exception e) {
+               reportError(e);
+       }
+    }
+    
+    private IMessageReadWrite getMessageReadWriteService() throws Exception {
+       String str = System.getProperty("secureChannelEnabled");
+        return ((str != null) && (str.equalsIgnoreCase("true"))) ? 
+                       new SecureMessageReadWriteService(socket, selector) : 
+                       new MessageReadWriteService(socket, selector);
+    }
 }