From f2344025e13ee3e51561bb171800d240f9f91e9a Mon Sep 17 00:00:00 2001 From: Jason Ye Date: Fri, 5 Apr 2013 16:58:47 -0700 Subject: [PATCH] Add TLS support in the Opendaylight Controller: - TLS configuration is specified in file ./configuration/config.ini. To enable the TLS feature, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files. The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate entries, including switches' Certification Authority (CA) certificates. Here is the sample configuration, secureChannelEnabled=true controllerKeyStore=./configuration/ctlKeyStore controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation) controllerTrustStore=./configuration/ctlTrustStore controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation) - Added two message read/write services, one for clear text, one for secure communication. - Added priority queue for message transmission. The system critical messages, such as Hello, Echo Reply etc will be treated as high priority and will be served ahead of other messages, like statistics request, discovery packets etc. Signed-off-by: Jason Ye --- .../main/resources/configuration/config.ini | 16 + .../main/resources/configuration/logback.xml | 3 + .../protocol_plugins/openflow/pom.xml | 3 +- .../openflow/core/IMessageReadWrite.java | 45 +++ .../openflow/core/ISwitch.java | 38 +- .../internal/MessageReadWriteService.java | 142 +++++++ .../core/internal/PriorityMessage.java | 59 +++ .../SecureMessageReadWriteService.java | 346 ++++++++++++++++++ .../openflow/core/internal/SwitchHandler.java | 290 +++++++++------ 9 files changed, 817 insertions(+), 125 deletions(-) create mode 100644 opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java create mode 100644 opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java create mode 100644 opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java create mode 100644 opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini b/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini index 1b90fc7798..5cababb85b 100644 --- a/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini +++ b/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini @@ -48,3 +48,19 @@ org.eclipse.gemini.web.tomcat.config.path=configuration/tomcat-server.xml # of.listenPort=6633 # The time (in milliseconds) the controller will wait for a response after sending a Barrier Request or a Statistic Request message (default 2000 msec) # of.messageResponseTimer=2000 + +# TLS configuration +# To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files. +# The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate +# entries, including switches' Certification Authority (CA) certificates. For example, +# secureChannelEnabled=true +# controllerKeyStore=./configuration/ctlKeyStore +# controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation) +# controllerTrustStore=./configuration/ctlTrustStore +# controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation) + +secureChannelEnabled=false +controllerKeyStore= +controllerKeyStorePassword= +controllerTrustStore= +controllerTrustStorePassword= diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/logback.xml b/opendaylight/distribution/opendaylight/src/main/resources/configuration/logback.xml index 3c3f78889f..a99d17d37c 100644 --- a/opendaylight/distribution/opendaylight/src/main/resources/configuration/logback.xml +++ b/opendaylight/distribution/opendaylight/src/main/resources/configuration/logback.xml @@ -36,6 +36,9 @@ + + + diff --git a/opendaylight/protocol_plugins/openflow/pom.xml b/opendaylight/protocol_plugins/openflow/pom.xml index 418c897b52..614b0ff818 100644 --- a/opendaylight/protocol_plugins/openflow/pom.xml +++ b/opendaylight/protocol_plugins/openflow/pom.xml @@ -38,7 +38,8 @@ org.apache.felix.dm, org.slf4j, org.eclipse.osgi.framework.console, - org.osgi.framework + org.osgi.framework, + javax.net.ssl org.opendaylight.controller.protocol_plugin.openflow.internal diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java new file mode 100644 index 0000000000..301159d6d3 --- /dev/null +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/IMessageReadWrite.java @@ -0,0 +1,45 @@ + +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.protocol_plugin.openflow.core; + +import java.util.List; + +import org.openflow.protocol.OFMessage; + +/** + * This interface defines low level routines to read/write messages on an open + * socket channel. If secure communication is desired, these methods also perform + * encryption and decryption of the network data. + */ +public interface IMessageReadWrite { + /** + * Sends the OF message out over the socket channel. For secure + * communication, the data will be encrypted. + * + * @param msg OF message to be sent + * @throws Exception + */ + public void asyncSend(OFMessage msg) throws Exception; + + /** + * Resumes sending the remaining messages in the outgoing buffer + * @throws Exception + */ + public void resumeSend() throws Exception; + + /** + * Reads the incoming network data from the socket and retrieves the OF + * messages. For secure communication, the data will be decrypted first. + * + * @return list of OF messages + * @throws Exception + */ + public List readMessages() throws Exception; +} diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java index 0a4560ee0f..a15c2e5c71 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java @@ -66,20 +66,52 @@ public interface ISwitch { public Date getConnectedDate(); /** - * Sends the message. A unique XID is generated automatically and inserted into the message. - * @param msg TheOF message to be sent + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. An unique XID is generated automatically and + * inserted into the message. + * + * @param msg The OF message to be sent * @return The XID used */ public Integer asyncSend(OFMessage msg); /** - * Sends the message with the specified XID. + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. The specified XID is inserted into the message. + * * @param msg The OF message to be Sent * @param xid The XID to be used in the message * @return The XID used */ public Integer asyncSend(OFMessage msg, int xid); + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. An unique XID is generated automatically and inserted into the + * message. + * + * @param msg The OF message to be sent + * @return The XID used + */ + public Integer asyncFastSend(OFMessage msg); + + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. The specified XID is inserted into the message. + * + * @param msg The OF message to be sent + * @return The XID used + */ + public Integer asyncFastSend(OFMessage msg, int xid); + /** * Sends the OF message followed by a Barrier Request with a unique XID which is automatically generated, * and waits for a result from the switch. diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java new file mode 100644 index 0000000000..d20bf1e0a0 --- /dev/null +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java @@ -0,0 +1,142 @@ + +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.protocol_plugin.openflow.core.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.List; + +import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; +import org.openflow.protocol.OFMessage; +import org.openflow.protocol.factory.BasicFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements methods to read/write messages over an established + * socket channel. The data exchange is in clear text format. + */ +public class MessageReadWriteService implements IMessageReadWrite { + private static final Logger logger = LoggerFactory + .getLogger(MessageReadWriteService.class); + private static final int bufferSize = 1024 * 1024; + + private Selector selector; + private SelectionKey clientSelectionKey; + private SocketChannel socket; + private ByteBuffer inBuffer; + private ByteBuffer outBuffer; + private BasicFactory factory; + + public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException { + this.socket = socket; + this.selector = selector; + this.factory = new BasicFactory(); + this.inBuffer = ByteBuffer.allocateDirect(bufferSize); + this.outBuffer = ByteBuffer.allocateDirect(bufferSize); + this.clientSelectionKey = this.socket.register(this.selector, + SelectionKey.OP_READ); + } + + /** + * Sends the OF message out over the socket channel. + * + * @param msg OF message to be sent + * @throws Exception + */ + @Override + public void asyncSend(OFMessage msg) throws IOException { + synchronized (outBuffer) { + int msgLen = msg.getLengthU(); + if (outBuffer.remaining() < msgLen) { + // increase the buffer size so that it can contain this message + ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer + .capacity() + + msgLen); + outBuffer.flip(); + newBuffer.put(outBuffer); + outBuffer = newBuffer; + } + msg.writeTo(outBuffer); + + if (!socket.isOpen()) { + return; + } + + outBuffer.flip(); + socket.write(outBuffer); + outBuffer.compact(); + if (outBuffer.position() > 0) { + this.clientSelectionKey = this.socket.register( + this.selector, SelectionKey.OP_WRITE, this); + } + logger.trace("Message sent: {}", msg.toString()); + } + } + + /** + * Resumes sending the remaining messages in the outgoing buffer + * @throws Exception + */ + @Override + public void resumeSend() throws IOException { + synchronized (outBuffer) { + if (!socket.isOpen()) { + return; + } + + outBuffer.flip(); + socket.write(outBuffer); + outBuffer.compact(); + if (outBuffer.position() > 0) { + this.clientSelectionKey = this.socket.register( + this.selector, SelectionKey.OP_WRITE, this); + } else { + this.clientSelectionKey = this.socket.register( + this.selector, SelectionKey.OP_READ, this); + } + } + } + + /** + * Reads the incoming network data from the socket and retrieves the OF + * messages. + * + * @return list of OF messages + * @throws Exception + */ + @Override + public List readMessages() throws IOException { + if (!socket.isOpen()) { + return null; + } + + List msgs = null; + int bytesRead = -1; + bytesRead = socket.read(inBuffer); + if (bytesRead < 0) { + throw new AsynchronousCloseException(); + } + + inBuffer.flip(); + msgs = factory.parseMessages(inBuffer); + if (inBuffer.hasRemaining()) { + inBuffer.compact(); + } else { + inBuffer.clear(); + } + return msgs; + } +} diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java new file mode 100644 index 0000000000..6bc4f1083b --- /dev/null +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java @@ -0,0 +1,59 @@ + +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.protocol_plugin.openflow.core.internal; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.openflow.protocol.OFMessage; + +/** + * This class describes an OpenFlow message with priority + */ +class PriorityMessage { + OFMessage msg; + int priority; + + public PriorityMessage(OFMessage msg, int priority) { + this.msg = msg; + this.priority = priority; + } + + public OFMessage getMsg() { + return msg; + } + + public void setMsg(OFMessage msg) { + this.msg = msg; + } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + @Override + public int hashCode() { + return HashCodeBuilder.reflectionHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return EqualsBuilder.reflectionEquals(this, obj); + } + + @Override + public String toString() { + return "PriorityMessage[" + ReflectionToStringBuilder.toString(this) + "]"; + } +} diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java new file mode 100644 index 0000000000..627e22337b --- /dev/null +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java @@ -0,0 +1,346 @@ + +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.protocol_plugin.openflow.core.internal; + +import java.io.FileInputStream; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.List; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; +import org.openflow.protocol.OFMessage; +import org.openflow.protocol.factory.BasicFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements methods to read/write messages over an established + * socket channel. The data exchange is encrypted/decrypted by SSLEngine. + */ +public class SecureMessageReadWriteService implements IMessageReadWrite { + private static final Logger logger = LoggerFactory + .getLogger(SecureMessageReadWriteService.class); + + private Selector selector; + private SelectionKey clientSelectionKey; + private SocketChannel socket; + private BasicFactory factory; + + private SSLEngine sslEngine; + private SSLEngineResult sslEngineResult; // results from sslEngine last operation + private ByteBuffer myAppData; // clear text message to be sent + private ByteBuffer myNetData; // encrypted message to be sent + private ByteBuffer peerAppData; // clear text message received from the switch + private ByteBuffer peerNetData; // encrypted message from the switch + + public SecureMessageReadWriteService(SocketChannel socket, Selector selector) throws Exception { + this.socket = socket; + this.selector = selector; + this.factory = new BasicFactory(); + + createSecureChannel(socket); + createBuffers(sslEngine); + } + + /** + * Bring up secure channel using SSL Engine + * + * @param socket TCP socket channel + * @throws Exception + */ + private void createSecureChannel(SocketChannel socket) throws Exception { + String keyStoreFile = System.getProperty("controllerKeyStore"); + String keyStorePassword = System.getProperty("controllerKeyStorePassword"); + String trustStoreFile = System.getProperty("controllerTrustStore");; + String trustStorePassword = System.getProperty("controllerTrustStorePassword");; + + KeyStore ks = KeyStore.getInstance("JKS"); + KeyStore ts = KeyStore.getInstance("JKS"); + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + ks.load(new FileInputStream(keyStoreFile), keyStorePassword.toCharArray()); + ts.load(new FileInputStream(trustStoreFile), trustStorePassword.toCharArray()); + kmf.init(ks, keyStorePassword.toCharArray()); + tmf.init(ts); + + SecureRandom random = new SecureRandom(); + random.nextInt(); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), random); + sslEngine = sslContext.createSSLEngine(); + sslEngine.setUseClientMode(false); + sslEngine.setNeedClientAuth(true); + + // Do initial handshake + doHandshake(socket, sslEngine); + + this.clientSelectionKey = this.socket.register(this.selector, + SelectionKey.OP_READ); + } + + /** + * Sends the OF message out over the socket channel. The message is + * encrypted by SSL Engine. + * + * @param msg OF message to be sent + * @throws Exception + */ + @Override + public void asyncSend(OFMessage msg) throws Exception { + synchronized (myAppData) { + int msgLen = msg.getLengthU(); + if (myAppData.remaining() < msgLen) { + // increase the buffer size so that it can contain this message + ByteBuffer newBuffer = ByteBuffer.allocateDirect(myAppData + .capacity() + + msgLen); + myAppData.flip(); + newBuffer.put(myAppData); + myAppData = newBuffer; + } + msg.writeTo(myAppData); + myAppData.flip(); + sslEngineResult = sslEngine.wrap(myAppData, myNetData); + logger.trace("asyncSend sslEngine wrap: {}", sslEngineResult); + runDelegatedTasks(sslEngineResult, sslEngine); + + if (!socket.isOpen()) { + return; + } + + myNetData.flip(); + socket.write(myNetData); + if (myNetData.hasRemaining()) { + myNetData.compact(); + } else { + myNetData.clear(); + } + + if (myAppData.hasRemaining()) { + myAppData.compact(); + this.clientSelectionKey = this.socket.register( + this.selector, SelectionKey.OP_WRITE, this); + } else { + myAppData.clear(); + this.clientSelectionKey = this.socket.register( + this.selector, SelectionKey.OP_READ, this); + } + + logger.trace("Message sent: {}", msg.toString()); + } + } + + /** + * Resumes sending the remaining messages in the outgoing buffer + * @throws Exception + */ + @Override + public void resumeSend() throws Exception { + synchronized (myAppData) { + myAppData.flip(); + sslEngineResult = sslEngine.wrap(myAppData, myNetData); + logger.trace("resumeSend sslEngine wrap: {}", sslEngineResult); + runDelegatedTasks(sslEngineResult, sslEngine); + + if (!socket.isOpen()) { + return; + } + + myNetData.flip(); + socket.write(myNetData); + if (myNetData.hasRemaining()) { + myNetData.compact(); + } else { + myNetData.clear(); + } + + if (myAppData.hasRemaining()) { + myAppData.compact(); + this.clientSelectionKey = this.socket.register(this.selector, + SelectionKey.OP_WRITE, this); + } else { + myAppData.clear(); + this.clientSelectionKey = this.socket.register(this.selector, + SelectionKey.OP_READ, this); + } + } + } + + /** + * Reads the incoming network data from the socket, decryptes them and then + * retrieves the OF messages. + * + * @return list of OF messages + * @throws Exception + */ + @Override + public List readMessages() throws Exception { + if (!socket.isOpen()) { + return null; + } + + List msgs = null; + int bytesRead = -1; + int countDown = 50; + + bytesRead = socket.read(peerNetData); + if (bytesRead < 0) { + throw new AsynchronousCloseException(); + } + + do { + peerNetData.flip(); + sslEngineResult = sslEngine.unwrap(peerNetData, peerAppData); + if (peerNetData.hasRemaining()) { + peerNetData.compact(); + } else { + peerNetData.clear(); + } + logger.trace("sslEngine unwrap result: {}", sslEngineResult); + runDelegatedTasks(sslEngineResult, sslEngine); + } while ((sslEngineResult.getStatus() == SSLEngineResult.Status.OK) && + peerNetData.hasRemaining() && (--countDown > 0)); + + if (countDown == 0) { + logger.trace("countDown reaches 0. peerNetData pos {} lim {}", peerNetData.position(), peerNetData.limit()); + } + + peerAppData.flip(); + msgs = factory.parseMessages(peerAppData); + if (peerAppData.hasRemaining()) { + peerAppData.compact(); + } else { + peerAppData.clear(); + } + + this.clientSelectionKey = this.socket.register( + this.selector, SelectionKey.OP_READ, this); + + return msgs; + } + + /** + * If the result indicates that we have outstanding tasks to do, + * go ahead and run them in this thread. + */ + private void runDelegatedTasks(SSLEngineResult result, + SSLEngine engine) throws Exception { + + if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + Runnable runnable; + while ((runnable = engine.getDelegatedTask()) != null) { + logger.debug("\trunning delegated task..."); + runnable.run(); + } + HandshakeStatus hsStatus = engine.getHandshakeStatus(); + if (hsStatus == HandshakeStatus.NEED_TASK) { + throw new Exception( + "handshake shouldn't need additional tasks"); + } + logger.debug("\tnew HandshakeStatus: {}", hsStatus); + } + } + + private void doHandshake(SocketChannel socket, SSLEngine engine) throws Exception { + SSLSession session = engine.getSession(); + ByteBuffer myAppData = ByteBuffer.allocate(session.getApplicationBufferSize()); + ByteBuffer peerAppData = ByteBuffer.allocate(session.getApplicationBufferSize()); + ByteBuffer myNetData = ByteBuffer.allocate(session.getPacketBufferSize()); + ByteBuffer peerNetData = ByteBuffer.allocate(session.getPacketBufferSize()); + + // Begin handshake + engine.beginHandshake(); + SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus(); + + // Process handshaking message + while (hs != SSLEngineResult.HandshakeStatus.FINISHED && + hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) { + switch (hs) { + case NEED_UNWRAP: + // Receive handshaking data from peer + if (socket.read(peerNetData) < 0) { + throw new AsynchronousCloseException(); + } + + // Process incoming handshaking data + peerNetData.flip(); + SSLEngineResult res = engine.unwrap(peerNetData, peerAppData); + peerNetData.compact(); + hs = res.getHandshakeStatus(); + + // Check status + switch (res.getStatus()) { + case OK : + // Handle OK status + break; + } + break; + + case NEED_WRAP : + // Empty the local network packet buffer. + myNetData.clear(); + + // Generate handshaking data + res = engine.wrap(myAppData, myNetData); + hs = res.getHandshakeStatus(); + + // Check status + switch (res.getStatus()) { + case OK : + myNetData.flip(); + + // Send the handshaking data to peer + while (myNetData.hasRemaining()) { + if (socket.write(myNetData) < 0) { + throw new AsynchronousCloseException(); + } + } + break; + } + break; + + case NEED_TASK : + // Handle blocking tasks + Runnable runnable; + while ((runnable = engine.getDelegatedTask()) != null) { + logger.debug("\trunning delegated task..."); + runnable.run(); + } + hs = engine.getHandshakeStatus(); + if (hs == HandshakeStatus.NEED_TASK) { + throw new Exception( + "handshake shouldn't need additional tasks"); + } + logger.debug("\tnew HandshakeStatus: {}", hs); + break; + } + } + } + + private void createBuffers(SSLEngine engine) { + SSLSession session = engine.getSession(); + this.myAppData = ByteBuffer.allocate(session.getApplicationBufferSize()); + this.peerAppData = ByteBuffer.allocate(session.getApplicationBufferSize()); + this.myNetData = ByteBuffer.allocate(session.getPacketBufferSize()); + this.peerNetData = ByteBuffer.allocate(session.getPacketBufferSize()); + } +} diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 8881fb5364..45203758bd 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -9,12 +9,13 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; -import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.ArrayList; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -28,11 +29,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.protocol_plugin.openflow.core.IController; import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; +import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; import org.openflow.protocol.OFBarrierReply; import org.openflow.protocol.OFEchoReply; import org.openflow.protocol.OFError; @@ -63,7 +66,6 @@ public class SwitchHandler implements ISwitch { private static final int SWITCH_LIVENESS_TIMER = 5000; private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500; private int MESSAGE_RESPONSE_TIMER = 2000; - private static final int bufferSize = 1024 * 1024; private String instanceName; private ISwitch thisISwitch; @@ -74,10 +76,7 @@ public class SwitchHandler implements ISwitch { private Byte tables; private Integer actions; private Selector selector; - private SelectionKey clientSelectionKey; private SocketChannel socket; - private ByteBuffer inBuffer; - private ByteBuffer outBuffer; private BasicFactory factory; private AtomicInteger xid; private SwitchState state; @@ -90,9 +89,12 @@ public class SwitchHandler implements ISwitch { private ExecutorService executor; private ConcurrentHashMap> messageWaitingDone; private boolean running; + private IMessageReadWrite msgReadWriteService; private Thread switchHandlerThread; private Integer responseTimerValue; - + private PriorityBlockingQueue transmitQ; + private Thread transmitThread; + private enum SwitchState { NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL( 3); @@ -130,44 +132,37 @@ public class SwitchHandler implements ISwitch { this.periodicTimer = null; this.executor = Executors.newFixedThreadPool(4); this.messageWaitingDone = new ConcurrentHashMap>(); - this.inBuffer = ByteBuffer.allocateDirect(bufferSize); - this.outBuffer = ByteBuffer.allocateDirect(bufferSize); this.responseTimerValue = MESSAGE_RESPONSE_TIMER; String rTimer = System.getProperty("of.messageResponseTimer"); if (rTimer != null) { - try { - responseTimerValue = Integer.decode(rTimer); - } catch (NumberFormatException e) { - logger.warn("Invalid of.messageResponseTimer:" + rTimer + ", use default(" - + MESSAGE_RESPONSE_TIMER+ ")"); - } + try { + responseTimerValue = Integer.decode(rTimer); + } catch (NumberFormatException e) { + logger.warn("Invalid of.messageResponseTimer: {} use default({})", + rTimer, MESSAGE_RESPONSE_TIMER); + } } - } + } public void start() { try { - this.selector = SelectorProvider.provider().openSelector(); - this.socket.configureBlocking(false); - this.socket.socket().setTcpNoDelay(true); - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_READ); + startTransmitThread(); + setupCommChannel(); + sendFirstHello(); startHandlerThread(); } catch (Exception e) { reportError(e); - return; } } private void startHandlerThread() { - OFMessage msg = factory.getMessage(OFType.HELLO); - asyncSend(msg); switchHandlerThread = new Thread(new Runnable() { @Override public void run() { running = true; while (running) { try { - // wait for an incoming connection + // wait for an incoming connection selector.select(0); Iterator selectedKeys = selector .selectedKeys().iterator(); @@ -195,7 +190,7 @@ public class SwitchHandler implements ISwitch { running = false; selector.wakeup(); cancelSwitchTimer(); - this.clientSelectionKey.cancel(); + this.selector.close(); this.socket.close(); executor.shutdown(); } catch (Exception e) { @@ -209,77 +204,94 @@ public class SwitchHandler implements ISwitch { return this.xid.incrementAndGet(); } + /** + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. An unique XID is generated automatically and + * inserted into the message. + * + * @param msg The OF message to be sent + * @return The XID used + */ @Override public Integer asyncSend(OFMessage msg) { - return asyncSend(msg, getNextXid()); + return asyncSend(msg, getNextXid()); + } + + /** + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. The specified XID is inserted into the message. + * + * @param msg The OF message to be Sent + * @param xid The XID to be used in the message + * @return The XID used + */ + @Override + public Integer asyncSend(OFMessage msg, int xid) { + msg.setXid(xid); + transmitQ.add(new PriorityMessage(msg, 0)); + return xid; } + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. An unique XID is generated automatically and inserted into the + * message. + * + * @param msg The OF message to be sent + * @return The XID used + */ @Override - public Integer asyncSend(OFMessage msg, int xid) { - synchronized (outBuffer) { - /* - if ((msg.getType() != OFType.ECHO_REQUEST) && - (msg.getType() != OFType.ECHO_REPLY)) { - logger.debug("sending " + msg.getType().toString() + " to " + toString()); - } - */ - msg.setXid(xid); - int msgLen = msg.getLengthU(); - if (outBuffer.remaining() < msgLen) { - // increase the buffer size so that it can contain this message - ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer - .capacity() - + msgLen); - outBuffer.flip(); - newBuffer.put(outBuffer); - outBuffer = newBuffer; - } - msg.writeTo(outBuffer); - outBuffer.flip(); - try { - socket.write(outBuffer); - outBuffer.compact(); - if (outBuffer.position() > 0) { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_WRITE, this); - } - logger.trace("Message sent: " + msg.toString()); - } catch (Exception e) { - reportError(e); - } - } + public Integer asyncFastSend(OFMessage msg) { + return asyncFastSend(msg, getNextXid()); + } + + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. The specified XID is inserted into the message. + * + * @param msg The OF message to be sent + * @return The XID used + */ + @Override + public Integer asyncFastSend(OFMessage msg, int xid) { + msg.setXid(xid); + transmitQ.add(new PriorityMessage(msg, 1)); return xid; } - public void resumeSend() { - synchronized (outBuffer) { - try { - outBuffer.flip(); - socket.write(outBuffer); - outBuffer.compact(); - if (outBuffer.position() > 0) { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_WRITE, this); - } else { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_READ, this); - } - } catch (Exception e) { - reportError(e); - } - } + public void resumeSend() { + try { + msgReadWriteService.resumeSend(); + } catch (Exception e) { + reportError(e); + } } public void handleMessages() { - List msgs = readMessages(); + List msgs = null; + + try { + msgs = msgReadWriteService.readMessages(); + } catch (Exception e) { + reportError(e); + } + if (msgs == null) { - logger.debug(toString() + " is down"); + logger.debug("{} is down", toString()); // the connection is down, inform core reportSwitchStateChange(false); return; } for (OFMessage msg : msgs) { - logger.trace("Message received: " + msg.toString()); + logger.trace("Message received: {}", msg.toString()); /* if ((msg.getType() != OFType.ECHO_REQUEST) && (msg.getType() != OFType.ECHO_REPLY)) { @@ -293,7 +305,7 @@ public class SwitchHandler implements ISwitch { // send feature request OFMessage featureRequest = factory .getMessage(OFType.FEATURES_REQUEST); - asyncSend(featureRequest); + asyncFastSend(featureRequest); // delete all pre-existing flows OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL); OFFlowMod flowMod = (OFFlowMod) factory @@ -301,14 +313,14 @@ public class SwitchHandler implements ISwitch { flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE) .setOutPort(OFPort.OFPP_NONE).setLength( (short) OFFlowMod.MINIMUM_LENGTH); - asyncSend(flowMod); + asyncFastSend(flowMod); this.state = SwitchState.WAIT_FEATURES_REPLY; startSwitchTimer(); break; case ECHO_REQUEST: OFEchoReply echoReply = (OFEchoReply) factory .getMessage(OFType.ECHO_REPLY); - asyncSend(echoReply); + asyncFastSend(echoReply); break; case ECHO_REPLY: this.probeSent = false; @@ -362,28 +374,6 @@ public class SwitchHandler implements ISwitch { } - private List readMessages() { - List msgs = null; - int bytesRead; - try { - bytesRead = socket.read(inBuffer); - } catch (Exception e) { - reportError(e); - return null; - } - if (bytesRead == -1) { - return null; - } - inBuffer.flip(); - msgs = factory.parseMessages(inBuffer); - if (inBuffer.hasRemaining()) { - inBuffer.compact(); - } else { - inBuffer.clear(); - } - return msgs; - } - private void startSwitchTimer() { this.periodicTimer = new Timer(); this.periodicTimer.scheduleAtFixedRate(new TimerTask() { @@ -394,8 +384,7 @@ public class SwitchHandler implements ISwitch { if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) { if (probeSent) { // switch failed to respond to our probe, consider it down - logger.warn(toString() - + " is idle for too long, disconnect"); + logger.warn("{} is idle for too long, disconnect", toString()); reportSwitchStateChange(false); } else { // send a probe to see if the switch is still alive @@ -403,14 +392,14 @@ public class SwitchHandler implements ISwitch { probeSent = true; OFMessage echo = factory .getMessage(OFType.ECHO_REQUEST); - asyncSend(echo); + asyncFastSend(echo); } } else { if (state == SwitchState.WAIT_FEATURES_REPLY) { // send another features request OFMessage request = factory .getMessage(OFType.FEATURES_REQUEST); - asyncSend(request); + asyncFastSend(request); } else { if (state == SwitchState.WAIT_CONFIG_REPLY) { // send another config request @@ -418,10 +407,10 @@ public class SwitchHandler implements ISwitch { .getMessage(OFType.SET_CONFIG); config.setMissSendLength((short) 0xffff) .setLengthU(OFSetConfig.MINIMUM_LENGTH); - asyncSend(config); + asyncFastSend(config); OFMessage getConfig = factory .getMessage(OFType.GET_CONFIG_REQUEST); - asyncSend(getConfig); + asyncFastSend(getConfig); } } } @@ -439,8 +428,8 @@ public class SwitchHandler implements ISwitch { } private void reportError(Exception e) { - //logger.error(toString() + " caught Error " + e.toString()); - // notify core of this error event + logger.debug("Caught exception ", e); + // notify core of this error event and disconnect the switch ((Controller) core).takeSwitchEventError(this); } @@ -473,10 +462,10 @@ public class SwitchHandler implements ISwitch { .getMessage(OFType.SET_CONFIG); config.setMissSendLength((short) 0xffff).setLengthU( OFSetConfig.MINIMUM_LENGTH); - asyncSend(config); + asyncFastSend(config); // send config request to make sure the switch can handle the set config OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST); - asyncSend(getConfig); + asyncFastSend(getConfig); this.state = SwitchState.WAIT_CONFIG_REPLY; // inform core that a new switch is now operational reportSwitchStateChange(true); @@ -545,8 +534,7 @@ public class SwitchHandler implements ISwitch { .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS); return result; } catch (Exception e) { - logger.warn("Timeout while waiting for " + req.getType() - + " replies"); + logger.warn("Timeout while waiting for {} replies", req.getType()); result = null; // to indicate timeout has occurred return result; } @@ -573,13 +561,12 @@ public class SwitchHandler implements ISwitch { } else { // if result is not null, this means the switch can't handle this message // the result if OFError already - logger.debug("Send " + msg.getType().toString() - + " failed --> " + ((OFError) result).toString()); + logger.debug("Send {} failed --> {}", + msg.getType().toString(), ((OFError) result).toString()); } return result; } catch (Exception e) { - logger.warn("Timeout while waiting for " + msg.getType().toString() - + " reply"); + logger.warn("Timeout while waiting for {} reply", msg.getType().toString()); // convert the result into a Boolean with value false status = false; result = status; @@ -637,7 +624,7 @@ public class SwitchHandler implements ISwitch { worker.wakeup(); } } - + @Override public Map getPhysicalPorts() { return this.physicalPorts; @@ -715,4 +702,65 @@ public class SwitchHandler implements ISwitch { } return result; } + + /* + * Transmit thread polls the message out of the priority queue and invokes + * messaging service to transmit it over the socket channel + */ + class PriorityMessageTransmit implements Runnable { + public void run() { + while (true) { + try { + if (!transmitQ.isEmpty()) { + PriorityMessage pmsg = transmitQ.poll(); + msgReadWriteService.asyncSend(pmsg.msg); + logger.trace("Message sent: {}", pmsg.toString()); + } + Thread.sleep(10); + } catch (Exception e) { + reportError(e); + } + } + } + } + + /* + * Setup and start the transmit thread + */ + private void startTransmitThread() { + this.transmitQ = new PriorityBlockingQueue(11, + new Comparator() { + public int compare(PriorityMessage p1, PriorityMessage p2) { + return p2.priority - p1.priority; + } + }); + this.transmitThread = new Thread(new PriorityMessageTransmit()); + this.transmitThread.start(); + } + + /* + * Setup communication services + */ + private void setupCommChannel() throws Exception { + this.selector = SelectorProvider.provider().openSelector(); + this.socket.configureBlocking(false); + this.socket.socket().setTcpNoDelay(true); + this.msgReadWriteService = getMessageReadWriteService(); + } + + private void sendFirstHello() { + try { + OFMessage msg = factory.getMessage(OFType.HELLO); + asyncFastSend(msg); + } catch (Exception e) { + reportError(e); + } + } + + private IMessageReadWrite getMessageReadWriteService() throws Exception { + String str = System.getProperty("secureChannelEnabled"); + return ((str != null) && (str.equalsIgnoreCase("true"))) ? + new SecureMessageReadWriteService(socket, selector) : + new MessageReadWriteService(socket, selector); + } } -- 2.36.6