Merge "Add TLS support in the Opendaylight Controller: - TLS configuration is specifi...
authorAlessandro Boch <aboch@cisco.com>
Sat, 6 Apr 2013 00:50:18 +0000 (00:50 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 6 Apr 2013 00:50:18 +0000 (00:50 +0000)
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/distribution/opendaylight/src/main/resources/configuration/logback.xml
opendaylight/protocol_plugins/openflow/pom.xml
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java

index 1b90fc77980413422f1f782fb948d487fe6ad0be..5cababb85b461b697464dc44b7134e829e0c90ca 100644 (file)
@@ -48,3 +48,19 @@ org.eclipse.gemini.web.tomcat.config.path=configuration/tomcat-server.xml
 # of.listenPort=6633
 # The time (in milliseconds) the controller will wait for a response after sending a Barrier Request or a Statistic Request message (default 2000 msec)
 # of.messageResponseTimer=2000
+
+# TLS configuration
+# To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
+# The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate 
+# entries, including switches' Certification Authority (CA) certificates. For example,
+# secureChannelEnabled=true
+# controllerKeyStore=./configuration/ctlKeyStore
+# controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation)
+# controllerTrustStore=./configuration/ctlTrustStore
+# controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation)
+
+secureChannelEnabled=false
+controllerKeyStore=
+controllerKeyStorePassword=
+controllerTrustStore=
+controllerTrustStorePassword=
index 3c3f78889f4cc726491e77b41f14e670d08799af..a99d17d37c25de4816fc108da4a0e5879326fa2b 100644 (file)
@@ -36,6 +36,9 @@
   <logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.InventoryServiceShim" level="INFO"/>
   <logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.TopologyServices" level="INFO"/>
   <logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.TopologyServiceShim" level="INFO"/>
+  <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.Controller" level="INFO"/>
+  <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchHandler" level="INFO"/>
+  <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchIOSecureService" level="INFO"/>
   <!-- SAL  -->
   <logger name="org.opendaylight.controller.sal" level="INFO"/>
   <logger name="org.opendaylight.controller.sal.implementation" level="INFO"/>
index 418c897b52134c6e20b3b57e44acd65b07b244cd..614b0ff81826bea3b74c7d4219bdea0c7b653aca 100644 (file)
@@ -38,7 +38,8 @@
               org.apache.felix.dm,
               org.slf4j,
               org.eclipse.osgi.framework.console,
-                         org.osgi.framework
+                         org.osgi.framework,
+                         javax.net.ssl
             </Import-Package>
             <Export-Package>
                          org.opendaylight.controller.protocol_plugin.openflow.internal
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java
new file mode 100644 (file)
index 0000000..301159d
--- /dev/null
@@ -0,0 +1,45 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core;
+
+import java.util.List;
+
+import org.openflow.protocol.OFMessage;
+
+/**
+ * This interface defines low level routines to read/write messages on an open
+ * socket channel. If secure communication is desired, these methods also perform
+ * encryption and decryption of the network data.
+ */
+public interface IMessageReadWrite {
+       /**
+        * Sends the OF message out over the socket channel. For secure
+        * communication, the data will be encrypted.
+        * 
+        * @param msg OF message to be sent
+        * @throws Exception
+        */
+       public void asyncSend(OFMessage msg) throws Exception;
+
+       /**
+        * Resumes sending the remaining messages in the outgoing buffer
+        * @throws Exception
+        */
+       public void resumeSend() throws Exception;
+
+       /**
+        * Reads the incoming network data from the socket and retrieves the OF
+        * messages. For secure communication, the data will be decrypted first.
+        * 
+        * @return list of OF messages
+        * @throws Exception
+        */
+    public List<OFMessage> readMessages() throws Exception;
+}
index 0a4560ee0f68a1fc7eb48662dcc178a5f3184f25..a15c2e5c715c95d89203a15b25dc95ebc185506c 100644 (file)
@@ -66,20 +66,52 @@ public interface ISwitch {
        public Date getConnectedDate();
 
        /**
-        * Sends the message. A unique XID is generated automatically and inserted into the message.
-        * @param msg TheOF message to be sent
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. An unique XID is generated automatically and
+        * inserted into the message.
+        * 
+        * @param msg The OF message to be sent
         * @return The XID used
         */
        public Integer asyncSend(OFMessage msg);
 
        /**
-        * Sends the message with the specified XID.
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. The specified XID is inserted into the message.
+        * 
         * @param msg The OF message to be Sent
         * @param xid The XID to be used in the message
         * @return The XID used
         */
        public Integer asyncSend(OFMessage msg, int xid);
 
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. An unique XID is generated automatically and inserted into the
+        * message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
+       public Integer asyncFastSend(OFMessage msg);
+
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. The specified XID is inserted into the message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
+       public Integer asyncFastSend(OFMessage msg, int xid);
+
        /**
         * Sends the OF message followed by a Barrier Request with a unique XID which is automatically generated,
         * and waits for a result from the switch.
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java
new file mode 100644 (file)
index 0000000..d20bf1e
--- /dev/null
@@ -0,0 +1,142 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements methods to read/write messages over an established
+ * socket channel. The data exchange is in clear text format.
+ */
+public class MessageReadWriteService implements IMessageReadWrite {
+    private static final Logger logger = LoggerFactory
+            .getLogger(MessageReadWriteService.class);
+    private static final int bufferSize = 1024 * 1024;
+
+    private Selector selector;
+    private SelectionKey clientSelectionKey;
+    private SocketChannel socket;
+    private ByteBuffer inBuffer;
+    private ByteBuffer outBuffer;
+    private BasicFactory factory;
+
+    public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
+       this.socket = socket;
+       this.selector = selector;
+       this.factory = new BasicFactory();
+       this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
+       this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
+       this.clientSelectionKey = this.socket.register(this.selector,
+                       SelectionKey.OP_READ);
+    }
+
+       /**
+        * Sends the OF message out over the socket channel.
+        * 
+        * @param msg OF message to be sent
+        * @throws Exception
+        */
+    @Override
+    public void asyncSend(OFMessage msg) throws IOException {
+       synchronized (outBuffer) {
+               int msgLen = msg.getLengthU();
+               if (outBuffer.remaining() < msgLen) {
+                       // increase the buffer size so that it can contain this message
+                       ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
+                                       .capacity()
+                                       + msgLen);
+                       outBuffer.flip();
+                       newBuffer.put(outBuffer);
+                       outBuffer = newBuffer;
+               }
+               msg.writeTo(outBuffer);
+
+               if (!socket.isOpen()) {
+                       return;
+               }
+
+               outBuffer.flip();
+               socket.write(outBuffer);
+               outBuffer.compact();
+               if (outBuffer.position() > 0) {
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_WRITE, this);
+               }
+               logger.trace("Message sent: {}", msg.toString());
+       }
+    }
+
+       /**
+        * Resumes sending the remaining messages in the outgoing buffer
+        * @throws Exception
+        */
+    @Override
+    public void resumeSend() throws IOException {
+               synchronized (outBuffer) {
+                       if (!socket.isOpen()) {
+                               return;
+                       }
+
+               outBuffer.flip();
+               socket.write(outBuffer);
+               outBuffer.compact();
+               if (outBuffer.position() > 0) {
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_WRITE, this);
+               } else {
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_READ, this);
+               }
+        }
+    }
+
+       /**
+        * Reads the incoming network data from the socket and retrieves the OF
+        * messages.
+        * 
+        * @return list of OF messages
+        * @throws Exception
+        */
+    @Override
+    public List<OFMessage> readMessages() throws IOException {
+               if (!socket.isOpen()) {
+                       return null;
+               }
+
+               List<OFMessage> msgs = null;
+        int bytesRead = -1;        
+        bytesRead = socket.read(inBuffer);
+        if (bytesRead < 0) {
+                       throw new AsynchronousCloseException();
+        }
+
+        inBuffer.flip();
+        msgs = factory.parseMessages(inBuffer);
+        if (inBuffer.hasRemaining()) {
+            inBuffer.compact();
+        } else {
+            inBuffer.clear();
+        }
+        return msgs;
+    }
+}
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java
new file mode 100644 (file)
index 0000000..6bc4f10
--- /dev/null
@@ -0,0 +1,59 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.openflow.protocol.OFMessage;
+
+/**
+ * This class describes an OpenFlow message with priority
+ */
+class PriorityMessage {
+       OFMessage msg;
+       int priority;
+       
+       public PriorityMessage(OFMessage msg, int priority) {
+               this.msg = msg;
+               this.priority = priority;
+       }
+
+       public OFMessage getMsg() {
+               return msg;
+       }
+
+       public void setMsg(OFMessage msg) {
+               this.msg = msg;
+       }
+
+       public int getPriority() {
+               return priority;
+       }
+
+       public void setPriority(int priority) {
+               this.priority = priority;
+       }       
+       
+       @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(this);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return EqualsBuilder.reflectionEquals(this, obj);
+    }
+
+    @Override
+    public String toString() {
+        return "PriorityMessage[" + ReflectionToStringBuilder.toString(this) + "]";
+    }
+}
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java
new file mode 100644 (file)
index 0000000..627e223
--- /dev/null
@@ -0,0 +1,346 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.List;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements methods to read/write messages over an established
+ * socket channel. The data exchange is encrypted/decrypted by SSLEngine.
+ */
+public class SecureMessageReadWriteService implements IMessageReadWrite {
+    private static final Logger logger = LoggerFactory
+            .getLogger(SecureMessageReadWriteService.class);
+
+    private Selector selector;
+    private SelectionKey clientSelectionKey;
+    private SocketChannel socket;
+    private BasicFactory factory;
+
+    private SSLEngine sslEngine;
+       private SSLEngineResult sslEngineResult;        // results from sslEngine last operation
+    private ByteBuffer myAppData;                              // clear text message to be sent
+    private ByteBuffer myNetData;                      // encrypted message to be sent
+    private ByteBuffer peerAppData;                            // clear text message received from the switch
+    private ByteBuffer peerNetData;                    // encrypted message from the switch
+
+    public SecureMessageReadWriteService(SocketChannel socket, Selector selector) throws Exception {
+       this.socket = socket;
+       this.selector = selector;
+       this.factory = new BasicFactory();
+
+       createSecureChannel(socket);
+       createBuffers(sslEngine);
+    }
+
+       /**
+        * Bring up secure channel using SSL Engine
+        * 
+        * @param socket TCP socket channel
+        * @throws Exception
+        */
+    private void createSecureChannel(SocketChannel socket) throws Exception {
+       String keyStoreFile = System.getProperty("controllerKeyStore");
+       String keyStorePassword = System.getProperty("controllerKeyStorePassword");
+       String trustStoreFile = System.getProperty("controllerTrustStore");;
+       String trustStorePassword = System.getProperty("controllerTrustStorePassword");;
+
+        KeyStore ks = KeyStore.getInstance("JKS");
+        KeyStore ts = KeyStore.getInstance("JKS");
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+        ks.load(new FileInputStream(keyStoreFile), keyStorePassword.toCharArray());
+        ts.load(new FileInputStream(trustStoreFile), trustStorePassword.toCharArray());
+        kmf.init(ks, keyStorePassword.toCharArray());
+        tmf.init(ts);
+
+        SecureRandom random = new SecureRandom();
+        random.nextInt();
+
+       SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), random);
+       sslEngine = sslContext.createSSLEngine();
+       sslEngine.setUseClientMode(false);
+       sslEngine.setNeedClientAuth(true);
+       
+       // Do initial handshake
+       doHandshake(socket, sslEngine);
+       
+        this.clientSelectionKey = this.socket.register(this.selector,
+                SelectionKey.OP_READ);
+    }
+
+       /**
+        * Sends the OF message out over the socket channel. The message is
+        * encrypted by SSL Engine.
+        * 
+        * @param msg OF message to be sent
+        * @throws Exception
+        */
+    @Override
+    public void asyncSend(OFMessage msg) throws Exception {
+       synchronized (myAppData) {
+               int msgLen = msg.getLengthU();
+               if (myAppData.remaining() < msgLen) {
+                       // increase the buffer size so that it can contain this message
+                       ByteBuffer newBuffer = ByteBuffer.allocateDirect(myAppData
+                                       .capacity()
+                                       + msgLen);
+                       myAppData.flip();
+                       newBuffer.put(myAppData);
+                       myAppData = newBuffer;
+               }
+               msg.writeTo(myAppData);
+               myAppData.flip();
+               sslEngineResult = sslEngine.wrap(myAppData, myNetData);
+               logger.trace("asyncSend sslEngine wrap: {}", sslEngineResult);
+               runDelegatedTasks(sslEngineResult, sslEngine);
+
+               if (!socket.isOpen()) {
+                       return;
+               }
+
+               myNetData.flip();
+               socket.write(myNetData);
+               if (myNetData.hasRemaining()) {
+                       myNetData.compact();
+               } else {
+                       myNetData.clear();
+               }
+
+               if (myAppData.hasRemaining()) {
+                       myAppData.compact();
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_WRITE, this);
+               } else {
+                       myAppData.clear();
+                       this.clientSelectionKey = this.socket.register(
+                                       this.selector, SelectionKey.OP_READ, this);
+               }
+
+               logger.trace("Message sent: {}", msg.toString());
+       }
+    }
+
+       /**
+        * Resumes sending the remaining messages in the outgoing buffer
+        * @throws Exception
+        */
+    @Override
+    public void resumeSend() throws Exception {
+               synchronized (myAppData) {
+                       myAppData.flip();
+                       sslEngineResult = sslEngine.wrap(myAppData, myNetData);
+                       logger.trace("resumeSend sslEngine wrap: {}", sslEngineResult);
+                       runDelegatedTasks(sslEngineResult, sslEngine);
+
+                       if (!socket.isOpen()) {
+                               return;
+                       }
+
+                       myNetData.flip();
+                       socket.write(myNetData);
+                       if (myNetData.hasRemaining()) {
+                               myNetData.compact();
+                       } else {
+                               myNetData.clear();
+                       }
+
+                       if (myAppData.hasRemaining()) {
+                               myAppData.compact();
+                               this.clientSelectionKey = this.socket.register(this.selector,
+                                               SelectionKey.OP_WRITE, this);
+                       } else {
+                               myAppData.clear();
+                               this.clientSelectionKey = this.socket.register(this.selector,
+                                               SelectionKey.OP_READ, this);
+                       }
+               }
+    }
+
+       /**
+        * Reads the incoming network data from the socket, decryptes them and then
+        * retrieves the OF messages.
+        * 
+        * @return list of OF messages
+        * @throws Exception
+        */
+    @Override
+    public List<OFMessage> readMessages() throws Exception {
+               if (!socket.isOpen()) {
+                       return null;
+               }
+
+               List<OFMessage> msgs = null;
+        int bytesRead = -1;
+       int countDown = 50;             
+
+       bytesRead = socket.read(peerNetData);
+       if (bytesRead < 0) {
+                       throw new AsynchronousCloseException();
+       }
+
+       do {                    
+               peerNetData.flip();
+               sslEngineResult = sslEngine.unwrap(peerNetData, peerAppData);
+               if (peerNetData.hasRemaining()) {
+                       peerNetData.compact();
+               } else {
+                       peerNetData.clear();
+               }
+               logger.trace("sslEngine unwrap result: {}", sslEngineResult);
+               runDelegatedTasks(sslEngineResult, sslEngine);
+       } while ((sslEngineResult.getStatus() == SSLEngineResult.Status.OK) &&
+                         peerNetData.hasRemaining() && (--countDown > 0));
+       
+       if (countDown == 0) {
+               logger.trace("countDown reaches 0. peerNetData pos {} lim {}", peerNetData.position(), peerNetData.limit());
+       }
+
+       peerAppData.flip();
+       msgs = factory.parseMessages(peerAppData);
+       if (peerAppData.hasRemaining()) {
+               peerAppData.compact();
+       } else {
+               peerAppData.clear();
+       }
+
+       this.clientSelectionKey = this.socket.register(
+                       this.selector, SelectionKey.OP_READ, this);
+        
+        return msgs;
+    }
+
+    /**
+     *  If the result indicates that we have outstanding tasks to do,
+     *  go ahead and run them in this thread.
+     */
+    private void runDelegatedTasks(SSLEngineResult result,
+               SSLEngine engine) throws Exception {
+
+       if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+               Runnable runnable;
+               while ((runnable = engine.getDelegatedTask()) != null) {
+                       logger.debug("\trunning delegated task...");
+                       runnable.run();
+               }
+               HandshakeStatus hsStatus = engine.getHandshakeStatus();
+               if (hsStatus == HandshakeStatus.NEED_TASK) {
+                       throw new Exception(
+                                       "handshake shouldn't need additional tasks");
+               }
+               logger.debug("\tnew HandshakeStatus: {}", hsStatus);
+       }
+    }
+
+    private void doHandshake(SocketChannel socket, SSLEngine engine) throws Exception {
+       SSLSession session = engine.getSession();
+       ByteBuffer myAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+       ByteBuffer peerAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+       ByteBuffer myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+       ByteBuffer peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+
+       // Begin handshake
+       engine.beginHandshake();
+       SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus();
+
+       // Process handshaking message
+       while (hs != SSLEngineResult.HandshakeStatus.FINISHED &&
+                  hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+               switch (hs) {
+               case NEED_UNWRAP:
+                       // Receive handshaking data from peer
+                       if (socket.read(peerNetData) < 0) {
+                               throw new AsynchronousCloseException();
+                       }
+
+                       // Process incoming handshaking data
+                       peerNetData.flip();
+                       SSLEngineResult res = engine.unwrap(peerNetData, peerAppData);
+                       peerNetData.compact();
+                       hs = res.getHandshakeStatus();
+
+                       // Check status
+                       switch (res.getStatus()) {
+                       case OK :
+                               // Handle OK status
+                               break;
+                       }
+                       break;
+
+               case NEED_WRAP :
+                       // Empty the local network packet buffer.
+                       myNetData.clear();
+
+                       // Generate handshaking data
+                       res = engine.wrap(myAppData, myNetData);
+                       hs = res.getHandshakeStatus();
+
+                       // Check status
+                       switch (res.getStatus()) {
+                       case OK :
+                               myNetData.flip();
+
+                               // Send the handshaking data to peer
+                               while (myNetData.hasRemaining()) {
+                                       if (socket.write(myNetData) < 0) {
+                                       throw new AsynchronousCloseException();
+                                       }
+                               }
+                               break;
+                       }
+                       break;
+
+               case NEED_TASK :
+                       // Handle blocking tasks
+                       Runnable runnable;
+                       while ((runnable = engine.getDelegatedTask()) != null) {
+                               logger.debug("\trunning delegated task...");
+                               runnable.run();
+                       }
+                       hs = engine.getHandshakeStatus();
+                       if (hs == HandshakeStatus.NEED_TASK) {
+                               throw new Exception(
+                                               "handshake shouldn't need additional tasks");
+                       }
+                       logger.debug("\tnew HandshakeStatus: {}", hs);
+                       break;
+               }
+       }
+    }
+    
+    private void createBuffers(SSLEngine engine) {
+       SSLSession session = engine.getSession();
+       this.myAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+       this.peerAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+       this.myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+       this.peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+    }
+}
index 8881fb53640caa58e692a08bdbdbb7bbb80916e6..45203758bdd45e13486936faab73c27f947f547d 100644 (file)
@@ -9,12 +9,13 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 
-import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -28,11 +29,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
 import org.openflow.protocol.OFBarrierReply;
 import org.openflow.protocol.OFEchoReply;
 import org.openflow.protocol.OFError;
@@ -63,7 +66,6 @@ public class SwitchHandler implements ISwitch {
     private static final int SWITCH_LIVENESS_TIMER = 5000;
     private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
     private int MESSAGE_RESPONSE_TIMER = 2000;
-    private static final int bufferSize = 1024 * 1024;
 
     private String instanceName;
     private ISwitch thisISwitch;
@@ -74,10 +76,7 @@ public class SwitchHandler implements ISwitch {
     private Byte tables;
     private Integer actions;
     private Selector selector;
-    private SelectionKey clientSelectionKey;
     private SocketChannel socket;
-    private ByteBuffer inBuffer;
-    private ByteBuffer outBuffer;
     private BasicFactory factory;
     private AtomicInteger xid;
     private SwitchState state;
@@ -90,9 +89,12 @@ public class SwitchHandler implements ISwitch {
     private ExecutorService executor;
     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
     private boolean running;
+    private IMessageReadWrite msgReadWriteService;
     private Thread switchHandlerThread;
     private Integer responseTimerValue;
-
+       private PriorityBlockingQueue<PriorityMessage> transmitQ;
+    private Thread transmitThread;
+    
     private enum SwitchState {
         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
                 3);
@@ -130,44 +132,37 @@ public class SwitchHandler implements ISwitch {
         this.periodicTimer = null;
         this.executor = Executors.newFixedThreadPool(4);
         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
-        this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
-        this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
         String rTimer = System.getProperty("of.messageResponseTimer");
         if (rTimer != null) {
-            try {
-                responseTimerValue = Integer.decode(rTimer);
-            } catch (NumberFormatException e) {
-                logger.warn("Invalid of.messageResponseTimer:" + rTimer + ", use default("
-                        + MESSAGE_RESPONSE_TIMER+ ")");
-            }
+               try {
+                       responseTimerValue = Integer.decode(rTimer);
+               } catch (NumberFormatException e) {
+                               logger.warn("Invalid of.messageResponseTimer: {} use default({})",
+                                               rTimer, MESSAGE_RESPONSE_TIMER);
+               }
         }
-    }
+       }
 
     public void start() {
         try {
-            this.selector = SelectorProvider.provider().openSelector();
-            this.socket.configureBlocking(false);
-            this.socket.socket().setTcpNoDelay(true);
-            this.clientSelectionKey = this.socket.register(this.selector,
-                    SelectionKey.OP_READ);
+               startTransmitThread();
+               setupCommChannel();
+               sendFirstHello();
             startHandlerThread();
         } catch (Exception e) {
             reportError(e);
-            return;
         }
     }
 
     private void startHandlerThread() {
-        OFMessage msg = factory.getMessage(OFType.HELLO);
-        asyncSend(msg);
         switchHandlerThread = new Thread(new Runnable() {
             @Override
             public void run() {
                 running = true;
                 while (running) {
                     try {
-                        // wait for an incoming connection
+                       // wait for an incoming connection
                         selector.select(0);
                         Iterator<SelectionKey> selectedKeys = selector
                                 .selectedKeys().iterator();
@@ -195,7 +190,7 @@ public class SwitchHandler implements ISwitch {
             running = false;
             selector.wakeup();
             cancelSwitchTimer();
-            this.clientSelectionKey.cancel();
+            this.selector.close();
             this.socket.close();
             executor.shutdown();
         } catch (Exception e) {
@@ -209,77 +204,94 @@ public class SwitchHandler implements ISwitch {
         return this.xid.incrementAndGet();
     }
 
+       /**
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. An unique XID is generated automatically and
+        * inserted into the message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
     @Override
     public Integer asyncSend(OFMessage msg) {
-        return asyncSend(msg, getNextXid());
+       return asyncSend(msg, getNextXid());
+    }
+
+       /**
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. The specified XID is inserted into the message.
+        * 
+        * @param msg The OF message to be Sent
+        * @param xid The XID to be used in the message
+        * @return The XID used
+        */
+    @Override
+    public Integer asyncSend(OFMessage msg, int xid) {
+       msg.setXid(xid);
+       transmitQ.add(new PriorityMessage(msg, 0));
+        return xid;
     }
 
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. An unique XID is generated automatically and inserted into the
+        * message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
     @Override
-    public Integer asyncSend(OFMessage msg, int xid) {
-        synchronized (outBuffer) {
-            /*
-            if ((msg.getType() != OFType.ECHO_REQUEST) &&
-                       (msg.getType() != OFType.ECHO_REPLY)) {
-               logger.debug("sending " + msg.getType().toString() + " to " + toString());
-            }
-             */
-            msg.setXid(xid);
-            int msgLen = msg.getLengthU();
-            if (outBuffer.remaining() < msgLen) {
-                // increase the buffer size so that it can contain this message
-                ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
-                        .capacity()
-                        + msgLen);
-                outBuffer.flip();
-                newBuffer.put(outBuffer);
-                outBuffer = newBuffer;
-            }
-            msg.writeTo(outBuffer);
-            outBuffer.flip();
-            try {
-                socket.write(outBuffer);
-                outBuffer.compact();
-                if (outBuffer.position() > 0) {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_WRITE, this);
-                }
-                logger.trace("Message sent: " + msg.toString());
-            } catch (Exception e) {
-                reportError(e);
-            }
-        }
+    public Integer asyncFastSend(OFMessage msg) {
+       return asyncFastSend(msg, getNextXid());
+    }
+
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. The specified XID is inserted into the message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
+    @Override
+    public Integer asyncFastSend(OFMessage msg, int xid) {
+       msg.setXid(xid);
+       transmitQ.add(new PriorityMessage(msg, 1));
         return xid;
     }
 
-    public void resumeSend() {
-        synchronized (outBuffer) {
-            try {
-                outBuffer.flip();
-                socket.write(outBuffer);
-                outBuffer.compact();
-                if (outBuffer.position() > 0) {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_WRITE, this);
-                } else {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_READ, this);
-                }
-            } catch (Exception e) {
-                reportError(e);
-            }
-        }
+   public void resumeSend() {
+        try {
+                       msgReadWriteService.resumeSend();
+               } catch (Exception e) {
+                       reportError(e);
+               }
     }
 
     public void handleMessages() {
-        List<OFMessage> msgs = readMessages();
+        List<OFMessage> msgs = null;
+        
+        try {
+               msgs = msgReadWriteService.readMessages();
+               } catch (Exception e) {
+                       reportError(e);
+               }
+               
         if (msgs == null) {
-            logger.debug(toString() + " is down");
+            logger.debug("{} is down", toString());
             // the connection is down, inform core
             reportSwitchStateChange(false);
             return;
         }
         for (OFMessage msg : msgs) {
-            logger.trace("Message received: " + msg.toString());
+            logger.trace("Message received: {}", msg.toString());
             /*
             if  ((msg.getType() != OFType.ECHO_REQUEST) &&
                        (msg.getType() != OFType.ECHO_REPLY)) {
@@ -293,7 +305,7 @@ public class SwitchHandler implements ISwitch {
                 // send feature request
                 OFMessage featureRequest = factory
                         .getMessage(OFType.FEATURES_REQUEST);
-                asyncSend(featureRequest);
+                asyncFastSend(featureRequest);
                 // delete all pre-existing flows
                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
                 OFFlowMod flowMod = (OFFlowMod) factory
@@ -301,14 +313,14 @@ public class SwitchHandler implements ISwitch {
                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
                         .setOutPort(OFPort.OFPP_NONE).setLength(
                                 (short) OFFlowMod.MINIMUM_LENGTH);
-                asyncSend(flowMod);
+                asyncFastSend(flowMod);
                 this.state = SwitchState.WAIT_FEATURES_REPLY;
                 startSwitchTimer();
                 break;
             case ECHO_REQUEST:
                 OFEchoReply echoReply = (OFEchoReply) factory
                         .getMessage(OFType.ECHO_REPLY);
-                asyncSend(echoReply);
+                asyncFastSend(echoReply);
                 break;
             case ECHO_REPLY:
                 this.probeSent = false;
@@ -362,28 +374,6 @@ public class SwitchHandler implements ISwitch {
 
     }
 
-    private List<OFMessage> readMessages() {
-        List<OFMessage> msgs = null;
-        int bytesRead;
-        try {
-            bytesRead = socket.read(inBuffer);
-        } catch (Exception e) {
-            reportError(e);
-            return null;
-        }
-        if (bytesRead == -1) {
-            return null;
-        }
-        inBuffer.flip();
-        msgs = factory.parseMessages(inBuffer);
-        if (inBuffer.hasRemaining()) {
-            inBuffer.compact();
-        } else {
-            inBuffer.clear();
-        }
-        return msgs;
-    }
-
     private void startSwitchTimer() {
         this.periodicTimer = new Timer();
         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
@@ -394,8 +384,7 @@ public class SwitchHandler implements ISwitch {
                     if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
                         if (probeSent) {
                             // switch failed to respond to our probe, consider it down
-                            logger.warn(toString()
-                                    + " is idle for too long, disconnect");
+                            logger.warn("{} is idle for too long, disconnect", toString());
                             reportSwitchStateChange(false);
                         } else {
                             // send a probe to see if the switch is still alive
@@ -403,14 +392,14 @@ public class SwitchHandler implements ISwitch {
                             probeSent = true;
                             OFMessage echo = factory
                                     .getMessage(OFType.ECHO_REQUEST);
-                            asyncSend(echo);
+                            asyncFastSend(echo);
                         }
                     } else {
                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
                             // send another features request
                             OFMessage request = factory
                                     .getMessage(OFType.FEATURES_REQUEST);
-                            asyncSend(request);
+                            asyncFastSend(request);
                         } else {
                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
                                 //  send another config request
@@ -418,10 +407,10 @@ public class SwitchHandler implements ISwitch {
                                         .getMessage(OFType.SET_CONFIG);
                                 config.setMissSendLength((short) 0xffff)
                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
-                                asyncSend(config);
+                                asyncFastSend(config);
                                 OFMessage getConfig = factory
                                         .getMessage(OFType.GET_CONFIG_REQUEST);
-                                asyncSend(getConfig);
+                                asyncFastSend(getConfig);
                             }
                         }
                     }
@@ -439,8 +428,8 @@ public class SwitchHandler implements ISwitch {
     }
 
     private void reportError(Exception e) {
-        //logger.error(toString() + " caught Error " + e.toString());
-        // notify core of this error event
+        logger.debug("Caught exception ", e);
+        // notify core of this error event and disconnect the switch
         ((Controller) core).takeSwitchEventError(this);
     }
 
@@ -473,10 +462,10 @@ public class SwitchHandler implements ISwitch {
                     .getMessage(OFType.SET_CONFIG);
             config.setMissSendLength((short) 0xffff).setLengthU(
                     OFSetConfig.MINIMUM_LENGTH);
-            asyncSend(config);
+            asyncFastSend(config);
             // send config request to make sure the switch can handle the set config
             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
-            asyncSend(getConfig);
+            asyncFastSend(getConfig);
             this.state = SwitchState.WAIT_CONFIG_REPLY;
             // inform core that a new switch is now operational
             reportSwitchStateChange(true);
@@ -545,8 +534,7 @@ public class SwitchHandler implements ISwitch {
                     .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for " + req.getType()
-                    + " replies");
+            logger.warn("Timeout while waiting for {} replies", req.getType());
             result = null; // to indicate timeout has occurred
             return result;
         }
@@ -573,13 +561,12 @@ public class SwitchHandler implements ISwitch {
             } else {
                 // if result  is not null, this means the switch can't handle this message
                 // the result if OFError already
-                logger.debug("Send " + msg.getType().toString()
-                        + " failed --> " + ((OFError) result).toString());
+                logger.debug("Send {} failed --> {}", 
+                               msg.getType().toString(), ((OFError) result).toString());
             }
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for " + msg.getType().toString()
-                    + " reply");
+            logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
             // convert the result into a Boolean with value false
             status = false;
             result = status;
@@ -637,7 +624,7 @@ public class SwitchHandler implements ISwitch {
             worker.wakeup();
         }
     }
-
+    
     @Override
     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
         return this.physicalPorts;
@@ -715,4 +702,65 @@ public class SwitchHandler implements ISwitch {
         }
         return result;
     }
+
+       /*
+        * Transmit thread polls the message out of the priority queue and invokes
+        * messaging service to transmit it over the socket channel
+        */
+    class PriorityMessageTransmit implements Runnable {
+        public void run() {
+            while (true) {
+               try {
+                       if (!transmitQ.isEmpty()) {
+                               PriorityMessage pmsg = transmitQ.poll();
+                               msgReadWriteService.asyncSend(pmsg.msg);
+                               logger.trace("Message sent: {}", pmsg.toString());
+                       }
+                       Thread.sleep(10);
+               } catch (Exception e) {
+                       reportError(e);
+               }
+            }
+        }
+    }
+
+    /*
+     * Setup and start the transmit thread
+     */
+    private void startTransmitThread() {       
+        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, 
+                               new Comparator<PriorityMessage>() {
+                                       public int compare(PriorityMessage p1, PriorityMessage p2) {
+                                               return p2.priority - p1.priority;
+                                       }
+                               });
+        this.transmitThread = new Thread(new PriorityMessageTransmit());
+        this.transmitThread.start();
+    }
+    
+    /*
+     * Setup communication services
+     */
+    private void setupCommChannel() throws Exception {
+        this.selector = SelectorProvider.provider().openSelector();
+        this.socket.configureBlocking(false);
+        this.socket.socket().setTcpNoDelay(true);        
+        this.msgReadWriteService = getMessageReadWriteService();
+    }
+
+    private void sendFirstHello() {
+       try {
+               OFMessage msg = factory.getMessage(OFType.HELLO);
+               asyncFastSend(msg);
+       } catch (Exception e) {
+               reportError(e);
+       }
+    }
+    
+    private IMessageReadWrite getMessageReadWriteService() throws Exception {
+       String str = System.getProperty("secureChannelEnabled");
+        return ((str != null) && (str.equalsIgnoreCase("true"))) ? 
+                       new SecureMessageReadWriteService(socket, selector) : 
+                       new MessageReadWriteService(socket, selector);
+    }
 }