# of.listenPort=6633
# The time (in milliseconds) the controller will wait for a response after sending a Barrier Request or a Statistic Request message (default 2000 msec)
# of.messageResponseTimer=2000
+
+# TLS configuration
+# To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
+# The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
+# entries, including switches' Certification Authority (CA) certificates. For example,
+# secureChannelEnabled=true
+# controllerKeyStore=./configuration/ctlKeyStore
+# controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation)
+# controllerTrustStore=./configuration/ctlTrustStore
+# controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation)
+
+secureChannelEnabled=false
+controllerKeyStore=
+controllerKeyStorePassword=
+controllerTrustStore=
+controllerTrustStorePassword=
<logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.InventoryServiceShim" level="INFO"/>
<logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.TopologyServices" level="INFO"/>
<logger name="org.opendaylight.controller.protocol_plugin.openflow.internal.TopologyServiceShim" level="INFO"/>
+ <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.Controller" level="INFO"/>
+ <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchHandler" level="INFO"/>
+ <logger name="org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchIOSecureService" level="INFO"/>
<!-- SAL -->
<logger name="org.opendaylight.controller.sal" level="INFO"/>
<logger name="org.opendaylight.controller.sal.implementation" level="INFO"/>
org.apache.felix.dm,
org.slf4j,
org.eclipse.osgi.framework.console,
- org.osgi.framework
+ org.osgi.framework,
+ javax.net.ssl
</Import-Package>
<Export-Package>
org.opendaylight.controller.protocol_plugin.openflow.internal
--- /dev/null
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core;
+
+import java.util.List;
+
+import org.openflow.protocol.OFMessage;
+
+/**
+ * This interface defines low level routines to read/write messages on an open
+ * socket channel. If secure communication is desired, these methods also perform
+ * encryption and decryption of the network data.
+ */
+public interface IMessageReadWrite {
+ /**
+ * Sends the OF message out over the socket channel. For secure
+ * communication, the data will be encrypted.
+ *
+ * @param msg OF message to be sent
+ * @throws Exception
+ */
+ public void asyncSend(OFMessage msg) throws Exception;
+
+ /**
+ * Resumes sending the remaining messages in the outgoing buffer
+ * @throws Exception
+ */
+ public void resumeSend() throws Exception;
+
+ /**
+ * Reads the incoming network data from the socket and retrieves the OF
+ * messages. For secure communication, the data will be decrypted first.
+ *
+ * @return list of OF messages
+ * @throws Exception
+ */
+ public List<OFMessage> readMessages() throws Exception;
+}
public Date getConnectedDate();
/**
- * Sends the message. A unique XID is generated automatically and inserted into the message.
- * @param msg TheOF message to be sent
+ * This method puts the message in an outgoing priority queue with normal
+ * priority. It will be served after high priority messages. The method
+ * should be used for non-critical messages such as statistics request,
+ * discovery packets, etc. An unique XID is generated automatically and
+ * inserted into the message.
+ *
+ * @param msg The OF message to be sent
* @return The XID used
*/
public Integer asyncSend(OFMessage msg);
/**
- * Sends the message with the specified XID.
+ * This method puts the message in an outgoing priority queue with normal
+ * priority. It will be served after high priority messages. The method
+ * should be used for non-critical messages such as statistics request,
+ * discovery packets, etc. The specified XID is inserted into the message.
+ *
* @param msg The OF message to be Sent
* @param xid The XID to be used in the message
* @return The XID used
*/
public Integer asyncSend(OFMessage msg, int xid);
+ /**
+ * This method puts the message in an outgoing priority queue with high
+ * priority. It will be served first before normal priority messages. The
+ * method should be used for critical messages such as hello, echo reply
+ * etc. An unique XID is generated automatically and inserted into the
+ * message.
+ *
+ * @param msg The OF message to be sent
+ * @return The XID used
+ */
+ public Integer asyncFastSend(OFMessage msg);
+
+ /**
+ * This method puts the message in an outgoing priority queue with high
+ * priority. It will be served first before normal priority messages. The
+ * method should be used for critical messages such as hello, echo reply
+ * etc. The specified XID is inserted into the message.
+ *
+ * @param msg The OF message to be sent
+ * @return The XID used
+ */
+ public Integer asyncFastSend(OFMessage msg, int xid);
+
/**
* Sends the OF message followed by a Barrier Request with a unique XID which is automatically generated,
* and waits for a result from the switch.
--- /dev/null
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements methods to read/write messages over an established
+ * socket channel. The data exchange is in clear text format.
+ */
+public class MessageReadWriteService implements IMessageReadWrite {
+ private static final Logger logger = LoggerFactory
+ .getLogger(MessageReadWriteService.class);
+ private static final int bufferSize = 1024 * 1024;
+
+ private Selector selector;
+ private SelectionKey clientSelectionKey;
+ private SocketChannel socket;
+ private ByteBuffer inBuffer;
+ private ByteBuffer outBuffer;
+ private BasicFactory factory;
+
+ public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
+ this.socket = socket;
+ this.selector = selector;
+ this.factory = new BasicFactory();
+ this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
+ this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
+ this.clientSelectionKey = this.socket.register(this.selector,
+ SelectionKey.OP_READ);
+ }
+
+ /**
+ * Sends the OF message out over the socket channel.
+ *
+ * @param msg OF message to be sent
+ * @throws Exception
+ */
+ @Override
+ public void asyncSend(OFMessage msg) throws IOException {
+ synchronized (outBuffer) {
+ int msgLen = msg.getLengthU();
+ if (outBuffer.remaining() < msgLen) {
+ // increase the buffer size so that it can contain this message
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
+ .capacity()
+ + msgLen);
+ outBuffer.flip();
+ newBuffer.put(outBuffer);
+ outBuffer = newBuffer;
+ }
+ msg.writeTo(outBuffer);
+
+ if (!socket.isOpen()) {
+ return;
+ }
+
+ outBuffer.flip();
+ socket.write(outBuffer);
+ outBuffer.compact();
+ if (outBuffer.position() > 0) {
+ this.clientSelectionKey = this.socket.register(
+ this.selector, SelectionKey.OP_WRITE, this);
+ }
+ logger.trace("Message sent: {}", msg.toString());
+ }
+ }
+
+ /**
+ * Resumes sending the remaining messages in the outgoing buffer
+ * @throws Exception
+ */
+ @Override
+ public void resumeSend() throws IOException {
+ synchronized (outBuffer) {
+ if (!socket.isOpen()) {
+ return;
+ }
+
+ outBuffer.flip();
+ socket.write(outBuffer);
+ outBuffer.compact();
+ if (outBuffer.position() > 0) {
+ this.clientSelectionKey = this.socket.register(
+ this.selector, SelectionKey.OP_WRITE, this);
+ } else {
+ this.clientSelectionKey = this.socket.register(
+ this.selector, SelectionKey.OP_READ, this);
+ }
+ }
+ }
+
+ /**
+ * Reads the incoming network data from the socket and retrieves the OF
+ * messages.
+ *
+ * @return list of OF messages
+ * @throws Exception
+ */
+ @Override
+ public List<OFMessage> readMessages() throws IOException {
+ if (!socket.isOpen()) {
+ return null;
+ }
+
+ List<OFMessage> msgs = null;
+ int bytesRead = -1;
+ bytesRead = socket.read(inBuffer);
+ if (bytesRead < 0) {
+ throw new AsynchronousCloseException();
+ }
+
+ inBuffer.flip();
+ msgs = factory.parseMessages(inBuffer);
+ if (inBuffer.hasRemaining()) {
+ inBuffer.compact();
+ } else {
+ inBuffer.clear();
+ }
+ return msgs;
+ }
+}
--- /dev/null
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.openflow.protocol.OFMessage;
+
+/**
+ * This class describes an OpenFlow message with priority
+ */
+class PriorityMessage {
+ OFMessage msg;
+ int priority;
+
+ public PriorityMessage(OFMessage msg, int priority) {
+ this.msg = msg;
+ this.priority = priority;
+ }
+
+ public OFMessage getMsg() {
+ return msg;
+ }
+
+ public void setMsg(OFMessage msg) {
+ this.msg = msg;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ @Override
+ public int hashCode() {
+ return HashCodeBuilder.reflectionHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public String toString() {
+ return "PriorityMessage[" + ReflectionToStringBuilder.toString(this) + "]";
+ }
+}
--- /dev/null
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.List;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements methods to read/write messages over an established
+ * socket channel. The data exchange is encrypted/decrypted by SSLEngine.
+ */
+public class SecureMessageReadWriteService implements IMessageReadWrite {
+ private static final Logger logger = LoggerFactory
+ .getLogger(SecureMessageReadWriteService.class);
+
+ private Selector selector;
+ private SelectionKey clientSelectionKey;
+ private SocketChannel socket;
+ private BasicFactory factory;
+
+ private SSLEngine sslEngine;
+ private SSLEngineResult sslEngineResult; // results from sslEngine last operation
+ private ByteBuffer myAppData; // clear text message to be sent
+ private ByteBuffer myNetData; // encrypted message to be sent
+ private ByteBuffer peerAppData; // clear text message received from the switch
+ private ByteBuffer peerNetData; // encrypted message from the switch
+
+ public SecureMessageReadWriteService(SocketChannel socket, Selector selector) throws Exception {
+ this.socket = socket;
+ this.selector = selector;
+ this.factory = new BasicFactory();
+
+ createSecureChannel(socket);
+ createBuffers(sslEngine);
+ }
+
+ /**
+ * Bring up secure channel using SSL Engine
+ *
+ * @param socket TCP socket channel
+ * @throws Exception
+ */
+ private void createSecureChannel(SocketChannel socket) throws Exception {
+ String keyStoreFile = System.getProperty("controllerKeyStore");
+ String keyStorePassword = System.getProperty("controllerKeyStorePassword");
+ String trustStoreFile = System.getProperty("controllerTrustStore");;
+ String trustStorePassword = System.getProperty("controllerTrustStorePassword");;
+
+ KeyStore ks = KeyStore.getInstance("JKS");
+ KeyStore ts = KeyStore.getInstance("JKS");
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+ ks.load(new FileInputStream(keyStoreFile), keyStorePassword.toCharArray());
+ ts.load(new FileInputStream(trustStoreFile), trustStorePassword.toCharArray());
+ kmf.init(ks, keyStorePassword.toCharArray());
+ tmf.init(ts);
+
+ SecureRandom random = new SecureRandom();
+ random.nextInt();
+
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), random);
+ sslEngine = sslContext.createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ sslEngine.setNeedClientAuth(true);
+
+ // Do initial handshake
+ doHandshake(socket, sslEngine);
+
+ this.clientSelectionKey = this.socket.register(this.selector,
+ SelectionKey.OP_READ);
+ }
+
+ /**
+ * Sends the OF message out over the socket channel. The message is
+ * encrypted by SSL Engine.
+ *
+ * @param msg OF message to be sent
+ * @throws Exception
+ */
+ @Override
+ public void asyncSend(OFMessage msg) throws Exception {
+ synchronized (myAppData) {
+ int msgLen = msg.getLengthU();
+ if (myAppData.remaining() < msgLen) {
+ // increase the buffer size so that it can contain this message
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(myAppData
+ .capacity()
+ + msgLen);
+ myAppData.flip();
+ newBuffer.put(myAppData);
+ myAppData = newBuffer;
+ }
+ msg.writeTo(myAppData);
+ myAppData.flip();
+ sslEngineResult = sslEngine.wrap(myAppData, myNetData);
+ logger.trace("asyncSend sslEngine wrap: {}", sslEngineResult);
+ runDelegatedTasks(sslEngineResult, sslEngine);
+
+ if (!socket.isOpen()) {
+ return;
+ }
+
+ myNetData.flip();
+ socket.write(myNetData);
+ if (myNetData.hasRemaining()) {
+ myNetData.compact();
+ } else {
+ myNetData.clear();
+ }
+
+ if (myAppData.hasRemaining()) {
+ myAppData.compact();
+ this.clientSelectionKey = this.socket.register(
+ this.selector, SelectionKey.OP_WRITE, this);
+ } else {
+ myAppData.clear();
+ this.clientSelectionKey = this.socket.register(
+ this.selector, SelectionKey.OP_READ, this);
+ }
+
+ logger.trace("Message sent: {}", msg.toString());
+ }
+ }
+
+ /**
+ * Resumes sending the remaining messages in the outgoing buffer
+ * @throws Exception
+ */
+ @Override
+ public void resumeSend() throws Exception {
+ synchronized (myAppData) {
+ myAppData.flip();
+ sslEngineResult = sslEngine.wrap(myAppData, myNetData);
+ logger.trace("resumeSend sslEngine wrap: {}", sslEngineResult);
+ runDelegatedTasks(sslEngineResult, sslEngine);
+
+ if (!socket.isOpen()) {
+ return;
+ }
+
+ myNetData.flip();
+ socket.write(myNetData);
+ if (myNetData.hasRemaining()) {
+ myNetData.compact();
+ } else {
+ myNetData.clear();
+ }
+
+ if (myAppData.hasRemaining()) {
+ myAppData.compact();
+ this.clientSelectionKey = this.socket.register(this.selector,
+ SelectionKey.OP_WRITE, this);
+ } else {
+ myAppData.clear();
+ this.clientSelectionKey = this.socket.register(this.selector,
+ SelectionKey.OP_READ, this);
+ }
+ }
+ }
+
+ /**
+ * Reads the incoming network data from the socket, decryptes them and then
+ * retrieves the OF messages.
+ *
+ * @return list of OF messages
+ * @throws Exception
+ */
+ @Override
+ public List<OFMessage> readMessages() throws Exception {
+ if (!socket.isOpen()) {
+ return null;
+ }
+
+ List<OFMessage> msgs = null;
+ int bytesRead = -1;
+ int countDown = 50;
+
+ bytesRead = socket.read(peerNetData);
+ if (bytesRead < 0) {
+ throw new AsynchronousCloseException();
+ }
+
+ do {
+ peerNetData.flip();
+ sslEngineResult = sslEngine.unwrap(peerNetData, peerAppData);
+ if (peerNetData.hasRemaining()) {
+ peerNetData.compact();
+ } else {
+ peerNetData.clear();
+ }
+ logger.trace("sslEngine unwrap result: {}", sslEngineResult);
+ runDelegatedTasks(sslEngineResult, sslEngine);
+ } while ((sslEngineResult.getStatus() == SSLEngineResult.Status.OK) &&
+ peerNetData.hasRemaining() && (--countDown > 0));
+
+ if (countDown == 0) {
+ logger.trace("countDown reaches 0. peerNetData pos {} lim {}", peerNetData.position(), peerNetData.limit());
+ }
+
+ peerAppData.flip();
+ msgs = factory.parseMessages(peerAppData);
+ if (peerAppData.hasRemaining()) {
+ peerAppData.compact();
+ } else {
+ peerAppData.clear();
+ }
+
+ this.clientSelectionKey = this.socket.register(
+ this.selector, SelectionKey.OP_READ, this);
+
+ return msgs;
+ }
+
+ /**
+ * If the result indicates that we have outstanding tasks to do,
+ * go ahead and run them in this thread.
+ */
+ private void runDelegatedTasks(SSLEngineResult result,
+ SSLEngine engine) throws Exception {
+
+ if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ logger.debug("\trunning delegated task...");
+ runnable.run();
+ }
+ HandshakeStatus hsStatus = engine.getHandshakeStatus();
+ if (hsStatus == HandshakeStatus.NEED_TASK) {
+ throw new Exception(
+ "handshake shouldn't need additional tasks");
+ }
+ logger.debug("\tnew HandshakeStatus: {}", hsStatus);
+ }
+ }
+
+ private void doHandshake(SocketChannel socket, SSLEngine engine) throws Exception {
+ SSLSession session = engine.getSession();
+ ByteBuffer myAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+ ByteBuffer peerAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+ ByteBuffer myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+ ByteBuffer peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+
+ // Begin handshake
+ engine.beginHandshake();
+ SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus();
+
+ // Process handshaking message
+ while (hs != SSLEngineResult.HandshakeStatus.FINISHED &&
+ hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+ switch (hs) {
+ case NEED_UNWRAP:
+ // Receive handshaking data from peer
+ if (socket.read(peerNetData) < 0) {
+ throw new AsynchronousCloseException();
+ }
+
+ // Process incoming handshaking data
+ peerNetData.flip();
+ SSLEngineResult res = engine.unwrap(peerNetData, peerAppData);
+ peerNetData.compact();
+ hs = res.getHandshakeStatus();
+
+ // Check status
+ switch (res.getStatus()) {
+ case OK :
+ // Handle OK status
+ break;
+ }
+ break;
+
+ case NEED_WRAP :
+ // Empty the local network packet buffer.
+ myNetData.clear();
+
+ // Generate handshaking data
+ res = engine.wrap(myAppData, myNetData);
+ hs = res.getHandshakeStatus();
+
+ // Check status
+ switch (res.getStatus()) {
+ case OK :
+ myNetData.flip();
+
+ // Send the handshaking data to peer
+ while (myNetData.hasRemaining()) {
+ if (socket.write(myNetData) < 0) {
+ throw new AsynchronousCloseException();
+ }
+ }
+ break;
+ }
+ break;
+
+ case NEED_TASK :
+ // Handle blocking tasks
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ logger.debug("\trunning delegated task...");
+ runnable.run();
+ }
+ hs = engine.getHandshakeStatus();
+ if (hs == HandshakeStatus.NEED_TASK) {
+ throw new Exception(
+ "handshake shouldn't need additional tasks");
+ }
+ logger.debug("\tnew HandshakeStatus: {}", hs);
+ break;
+ }
+ }
+ }
+
+ private void createBuffers(SSLEngine engine) {
+ SSLSession session = engine.getSession();
+ this.myAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+ this.peerAppData = ByteBuffer.allocate(session.getApplicationBufferSize());
+ this.myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+ this.peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
+ }
+}
package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
-import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
import org.openflow.protocol.OFBarrierReply;
import org.openflow.protocol.OFEchoReply;
import org.openflow.protocol.OFError;
private static final int SWITCH_LIVENESS_TIMER = 5000;
private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
private int MESSAGE_RESPONSE_TIMER = 2000;
- private static final int bufferSize = 1024 * 1024;
private String instanceName;
private ISwitch thisISwitch;
private Byte tables;
private Integer actions;
private Selector selector;
- private SelectionKey clientSelectionKey;
private SocketChannel socket;
- private ByteBuffer inBuffer;
- private ByteBuffer outBuffer;
private BasicFactory factory;
private AtomicInteger xid;
private SwitchState state;
private ExecutorService executor;
private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
private boolean running;
+ private IMessageReadWrite msgReadWriteService;
private Thread switchHandlerThread;
private Integer responseTimerValue;
-
+ private PriorityBlockingQueue<PriorityMessage> transmitQ;
+ private Thread transmitThread;
+
private enum SwitchState {
NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
3);
this.periodicTimer = null;
this.executor = Executors.newFixedThreadPool(4);
this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
- this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
- this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
String rTimer = System.getProperty("of.messageResponseTimer");
if (rTimer != null) {
- try {
- responseTimerValue = Integer.decode(rTimer);
- } catch (NumberFormatException e) {
- logger.warn("Invalid of.messageResponseTimer:" + rTimer + ", use default("
- + MESSAGE_RESPONSE_TIMER+ ")");
- }
+ try {
+ responseTimerValue = Integer.decode(rTimer);
+ } catch (NumberFormatException e) {
+ logger.warn("Invalid of.messageResponseTimer: {} use default({})",
+ rTimer, MESSAGE_RESPONSE_TIMER);
+ }
}
- }
+ }
public void start() {
try {
- this.selector = SelectorProvider.provider().openSelector();
- this.socket.configureBlocking(false);
- this.socket.socket().setTcpNoDelay(true);
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ);
+ startTransmitThread();
+ setupCommChannel();
+ sendFirstHello();
startHandlerThread();
} catch (Exception e) {
reportError(e);
- return;
}
}
private void startHandlerThread() {
- OFMessage msg = factory.getMessage(OFType.HELLO);
- asyncSend(msg);
switchHandlerThread = new Thread(new Runnable() {
@Override
public void run() {
running = true;
while (running) {
try {
- // wait for an incoming connection
+ // wait for an incoming connection
selector.select(0);
Iterator<SelectionKey> selectedKeys = selector
.selectedKeys().iterator();
running = false;
selector.wakeup();
cancelSwitchTimer();
- this.clientSelectionKey.cancel();
+ this.selector.close();
this.socket.close();
executor.shutdown();
} catch (Exception e) {
return this.xid.incrementAndGet();
}
+ /**
+ * This method puts the message in an outgoing priority queue with normal
+ * priority. It will be served after high priority messages. The method
+ * should be used for non-critical messages such as statistics request,
+ * discovery packets, etc. An unique XID is generated automatically and
+ * inserted into the message.
+ *
+ * @param msg The OF message to be sent
+ * @return The XID used
+ */
@Override
public Integer asyncSend(OFMessage msg) {
- return asyncSend(msg, getNextXid());
+ return asyncSend(msg, getNextXid());
+ }
+
+ /**
+ * This method puts the message in an outgoing priority queue with normal
+ * priority. It will be served after high priority messages. The method
+ * should be used for non-critical messages such as statistics request,
+ * discovery packets, etc. The specified XID is inserted into the message.
+ *
+ * @param msg The OF message to be Sent
+ * @param xid The XID to be used in the message
+ * @return The XID used
+ */
+ @Override
+ public Integer asyncSend(OFMessage msg, int xid) {
+ msg.setXid(xid);
+ transmitQ.add(new PriorityMessage(msg, 0));
+ return xid;
}
+ /**
+ * This method puts the message in an outgoing priority queue with high
+ * priority. It will be served first before normal priority messages. The
+ * method should be used for critical messages such as hello, echo reply
+ * etc. An unique XID is generated automatically and inserted into the
+ * message.
+ *
+ * @param msg The OF message to be sent
+ * @return The XID used
+ */
@Override
- public Integer asyncSend(OFMessage msg, int xid) {
- synchronized (outBuffer) {
- /*
- if ((msg.getType() != OFType.ECHO_REQUEST) &&
- (msg.getType() != OFType.ECHO_REPLY)) {
- logger.debug("sending " + msg.getType().toString() + " to " + toString());
- }
- */
- msg.setXid(xid);
- int msgLen = msg.getLengthU();
- if (outBuffer.remaining() < msgLen) {
- // increase the buffer size so that it can contain this message
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
- .capacity()
- + msgLen);
- outBuffer.flip();
- newBuffer.put(outBuffer);
- outBuffer = newBuffer;
- }
- msg.writeTo(outBuffer);
- outBuffer.flip();
- try {
- socket.write(outBuffer);
- outBuffer.compact();
- if (outBuffer.position() > 0) {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_WRITE, this);
- }
- logger.trace("Message sent: " + msg.toString());
- } catch (Exception e) {
- reportError(e);
- }
- }
+ public Integer asyncFastSend(OFMessage msg) {
+ return asyncFastSend(msg, getNextXid());
+ }
+
+ /**
+ * This method puts the message in an outgoing priority queue with high
+ * priority. It will be served first before normal priority messages. The
+ * method should be used for critical messages such as hello, echo reply
+ * etc. The specified XID is inserted into the message.
+ *
+ * @param msg The OF message to be sent
+ * @return The XID used
+ */
+ @Override
+ public Integer asyncFastSend(OFMessage msg, int xid) {
+ msg.setXid(xid);
+ transmitQ.add(new PriorityMessage(msg, 1));
return xid;
}
- public void resumeSend() {
- synchronized (outBuffer) {
- try {
- outBuffer.flip();
- socket.write(outBuffer);
- outBuffer.compact();
- if (outBuffer.position() > 0) {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_WRITE, this);
- } else {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_READ, this);
- }
- } catch (Exception e) {
- reportError(e);
- }
- }
+ public void resumeSend() {
+ try {
+ msgReadWriteService.resumeSend();
+ } catch (Exception e) {
+ reportError(e);
+ }
}
public void handleMessages() {
- List<OFMessage> msgs = readMessages();
+ List<OFMessage> msgs = null;
+
+ try {
+ msgs = msgReadWriteService.readMessages();
+ } catch (Exception e) {
+ reportError(e);
+ }
+
if (msgs == null) {
- logger.debug(toString() + " is down");
+ logger.debug("{} is down", toString());
// the connection is down, inform core
reportSwitchStateChange(false);
return;
}
for (OFMessage msg : msgs) {
- logger.trace("Message received: " + msg.toString());
+ logger.trace("Message received: {}", msg.toString());
/*
if ((msg.getType() != OFType.ECHO_REQUEST) &&
(msg.getType() != OFType.ECHO_REPLY)) {
// send feature request
OFMessage featureRequest = factory
.getMessage(OFType.FEATURES_REQUEST);
- asyncSend(featureRequest);
+ asyncFastSend(featureRequest);
// delete all pre-existing flows
OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
OFFlowMod flowMod = (OFFlowMod) factory
flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
.setOutPort(OFPort.OFPP_NONE).setLength(
(short) OFFlowMod.MINIMUM_LENGTH);
- asyncSend(flowMod);
+ asyncFastSend(flowMod);
this.state = SwitchState.WAIT_FEATURES_REPLY;
startSwitchTimer();
break;
case ECHO_REQUEST:
OFEchoReply echoReply = (OFEchoReply) factory
.getMessage(OFType.ECHO_REPLY);
- asyncSend(echoReply);
+ asyncFastSend(echoReply);
break;
case ECHO_REPLY:
this.probeSent = false;
}
- private List<OFMessage> readMessages() {
- List<OFMessage> msgs = null;
- int bytesRead;
- try {
- bytesRead = socket.read(inBuffer);
- } catch (Exception e) {
- reportError(e);
- return null;
- }
- if (bytesRead == -1) {
- return null;
- }
- inBuffer.flip();
- msgs = factory.parseMessages(inBuffer);
- if (inBuffer.hasRemaining()) {
- inBuffer.compact();
- } else {
- inBuffer.clear();
- }
- return msgs;
- }
-
private void startSwitchTimer() {
this.periodicTimer = new Timer();
this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
if (probeSent) {
// switch failed to respond to our probe, consider it down
- logger.warn(toString()
- + " is idle for too long, disconnect");
+ logger.warn("{} is idle for too long, disconnect", toString());
reportSwitchStateChange(false);
} else {
// send a probe to see if the switch is still alive
probeSent = true;
OFMessage echo = factory
.getMessage(OFType.ECHO_REQUEST);
- asyncSend(echo);
+ asyncFastSend(echo);
}
} else {
if (state == SwitchState.WAIT_FEATURES_REPLY) {
// send another features request
OFMessage request = factory
.getMessage(OFType.FEATURES_REQUEST);
- asyncSend(request);
+ asyncFastSend(request);
} else {
if (state == SwitchState.WAIT_CONFIG_REPLY) {
// send another config request
.getMessage(OFType.SET_CONFIG);
config.setMissSendLength((short) 0xffff)
.setLengthU(OFSetConfig.MINIMUM_LENGTH);
- asyncSend(config);
+ asyncFastSend(config);
OFMessage getConfig = factory
.getMessage(OFType.GET_CONFIG_REQUEST);
- asyncSend(getConfig);
+ asyncFastSend(getConfig);
}
}
}
}
private void reportError(Exception e) {
- //logger.error(toString() + " caught Error " + e.toString());
- // notify core of this error event
+ logger.debug("Caught exception ", e);
+ // notify core of this error event and disconnect the switch
((Controller) core).takeSwitchEventError(this);
}
.getMessage(OFType.SET_CONFIG);
config.setMissSendLength((short) 0xffff).setLengthU(
OFSetConfig.MINIMUM_LENGTH);
- asyncSend(config);
+ asyncFastSend(config);
// send config request to make sure the switch can handle the set config
OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
- asyncSend(getConfig);
+ asyncFastSend(getConfig);
this.state = SwitchState.WAIT_CONFIG_REPLY;
// inform core that a new switch is now operational
reportSwitchStateChange(true);
.get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for " + req.getType()
- + " replies");
+ logger.warn("Timeout while waiting for {} replies", req.getType());
result = null; // to indicate timeout has occurred
return result;
}
} else {
// if result is not null, this means the switch can't handle this message
// the result if OFError already
- logger.debug("Send " + msg.getType().toString()
- + " failed --> " + ((OFError) result).toString());
+ logger.debug("Send {} failed --> {}",
+ msg.getType().toString(), ((OFError) result).toString());
}
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for " + msg.getType().toString()
- + " reply");
+ logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
// convert the result into a Boolean with value false
status = false;
result = status;
worker.wakeup();
}
}
-
+
@Override
public Map<Short, OFPhysicalPort> getPhysicalPorts() {
return this.physicalPorts;
}
return result;
}
+
+ /*
+ * Transmit thread polls the message out of the priority queue and invokes
+ * messaging service to transmit it over the socket channel
+ */
+ class PriorityMessageTransmit implements Runnable {
+ public void run() {
+ while (true) {
+ try {
+ if (!transmitQ.isEmpty()) {
+ PriorityMessage pmsg = transmitQ.poll();
+ msgReadWriteService.asyncSend(pmsg.msg);
+ logger.trace("Message sent: {}", pmsg.toString());
+ }
+ Thread.sleep(10);
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+ }
+ }
+
+ /*
+ * Setup and start the transmit thread
+ */
+ private void startTransmitThread() {
+ this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
+ new Comparator<PriorityMessage>() {
+ public int compare(PriorityMessage p1, PriorityMessage p2) {
+ return p2.priority - p1.priority;
+ }
+ });
+ this.transmitThread = new Thread(new PriorityMessageTransmit());
+ this.transmitThread.start();
+ }
+
+ /*
+ * Setup communication services
+ */
+ private void setupCommChannel() throws Exception {
+ this.selector = SelectorProvider.provider().openSelector();
+ this.socket.configureBlocking(false);
+ this.socket.socket().setTcpNoDelay(true);
+ this.msgReadWriteService = getMessageReadWriteService();
+ }
+
+ private void sendFirstHello() {
+ try {
+ OFMessage msg = factory.getMessage(OFType.HELLO);
+ asyncFastSend(msg);
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+
+ private IMessageReadWrite getMessageReadWriteService() throws Exception {
+ String str = System.getProperty("secureChannelEnabled");
+ return ((str != null) && (str.equalsIgnoreCase("true"))) ?
+ new SecureMessageReadWriteService(socket, selector) :
+ new MessageReadWriteService(socket, selector);
+ }
}