package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
-import java.nio.ByteBuffer;
+import java.net.SocketException;
+import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
import org.openflow.protocol.OFBarrierReply;
import org.openflow.protocol.OFEchoReply;
import org.openflow.protocol.OFError;
private static final int SWITCH_LIVENESS_TIMER = 5000;
private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
private int MESSAGE_RESPONSE_TIMER = 2000;
- private static final int bufferSize = 1024 * 1024;
private String instanceName;
private ISwitch thisISwitch;
private Byte tables;
private Integer actions;
private Selector selector;
- private SelectionKey clientSelectionKey;
private SocketChannel socket;
- private ByteBuffer inBuffer;
- private ByteBuffer outBuffer;
private BasicFactory factory;
private AtomicInteger xid;
private SwitchState state;
private ExecutorService executor;
private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
private boolean running;
+ private IMessageReadWrite msgReadWriteService;
private Thread switchHandlerThread;
private Integer responseTimerValue;
-
+ private PriorityBlockingQueue<PriorityMessage> transmitQ;
+ private Thread transmitThread;
+
private enum SwitchState {
NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
3);
this.periodicTimer = null;
this.executor = Executors.newFixedThreadPool(4);
this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
- this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
- this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
String rTimer = System.getProperty("of.messageResponseTimer");
if (rTimer != null) {
- try {
- responseTimerValue = Integer.decode(rTimer);
- } catch (NumberFormatException e) {
- logger.warn("Invalid of.messageResponseTimer:" + rTimer + ", use default("
- + MESSAGE_RESPONSE_TIMER+ ")");
- }
+ try {
+ responseTimerValue = Integer.decode(rTimer);
+ } catch (NumberFormatException e) {
+ logger.warn("Invalid of.messageResponseTimer: {} use default({})",
+ rTimer, MESSAGE_RESPONSE_TIMER);
+ }
}
- }
+ }
public void start() {
try {
- this.selector = SelectorProvider.provider().openSelector();
- this.socket.configureBlocking(false);
- this.socket.socket().setTcpNoDelay(true);
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ);
+ startTransmitThread();
+ setupCommChannel();
+ sendFirstHello();
startHandlerThread();
} catch (Exception e) {
reportError(e);
- return;
}
}
private void startHandlerThread() {
- OFMessage msg = factory.getMessage(OFType.HELLO);
- asyncSend(msg);
switchHandlerThread = new Thread(new Runnable() {
@Override
public void run() {
running = true;
while (running) {
try {
- // wait for an incoming connection
+ // wait for an incoming connection
selector.select(0);
Iterator<SelectionKey> selectedKeys = selector
.selectedKeys().iterator();
}
public void stop() {
- try {
- running = false;
- selector.wakeup();
- cancelSwitchTimer();
- this.clientSelectionKey.cancel();
- this.socket.close();
- executor.shutdown();
- } catch (Exception e) {
- // do nothing since we are shutting down.
- return;
- }
+ running = false;
+ cancelSwitchTimer();
+ try {
+ selector.wakeup();
+ selector.close();
+ } catch (Exception e) {
+ }
+ try {
+ socket.close();
+ } catch (Exception e) {
+ }
+ try {
+ msgReadWriteService.stop();
+ } catch (Exception e) {
+ }
+ executor.shutdown();
+
+ selector = null;
+ socket = null;
+ msgReadWriteService = null;
+
+ if (switchHandlerThread != null) {
+ switchHandlerThread.interrupt();
+ }
+ if (transmitThread != null) {
+ transmitThread.interrupt();
+ }
}
@Override
return this.xid.incrementAndGet();
}
+ /**
+ * This method puts the message in an outgoing priority queue with normal
+ * priority. It will be served after high priority messages. The method
+ * should be used for non-critical messages such as statistics request,
+ * discovery packets, etc. An unique XID is generated automatically and
+ * inserted into the message.
+ *
+ * @param msg The OF message to be sent
+ * @return The XID used
+ */
@Override
public Integer asyncSend(OFMessage msg) {
- return asyncSend(msg, getNextXid());
+ return asyncSend(msg, getNextXid());
+ }
+
+ /**
+ * This method puts the message in an outgoing priority queue with normal
+ * priority. It will be served after high priority messages. The method
+ * should be used for non-critical messages such as statistics request,
+ * discovery packets, etc. The specified XID is inserted into the message.
+ *
+ * @param msg The OF message to be Sent
+ * @param xid The XID to be used in the message
+ * @return The XID used
+ */
+ @Override
+ public Integer asyncSend(OFMessage msg, int xid) {
+ msg.setXid(xid);
+ if (transmitQ != null) {
+ transmitQ.add(new PriorityMessage(msg, 0));
+ }
+ return xid;
}
+ /**
+ * This method puts the message in an outgoing priority queue with high
+ * priority. It will be served first before normal priority messages. The
+ * method should be used for critical messages such as hello, echo reply
+ * etc. An unique XID is generated automatically and inserted into the
+ * message.
+ *
+ * @param msg The OF message to be sent
+ * @return The XID used
+ */
@Override
- public Integer asyncSend(OFMessage msg, int xid) {
- synchronized (outBuffer) {
- /*
- if ((msg.getType() != OFType.ECHO_REQUEST) &&
- (msg.getType() != OFType.ECHO_REPLY)) {
- logger.debug("sending " + msg.getType().toString() + " to " + toString());
- }
- */
- msg.setXid(xid);
- int msgLen = msg.getLengthU();
- if (outBuffer.remaining() < msgLen) {
- // increase the buffer size so that it can contain this message
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
- .capacity()
- + msgLen);
- outBuffer.flip();
- newBuffer.put(outBuffer);
- outBuffer = newBuffer;
- }
- msg.writeTo(outBuffer);
- outBuffer.flip();
- try {
- socket.write(outBuffer);
- outBuffer.compact();
- if (outBuffer.position() > 0) {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_WRITE, this);
- }
- logger.trace("Message sent: " + msg.toString());
- } catch (Exception e) {
- reportError(e);
- }
- }
+ public Integer asyncFastSend(OFMessage msg) {
+ return asyncFastSend(msg, getNextXid());
+ }
+
+ /**
+ * This method puts the message in an outgoing priority queue with high
+ * priority. It will be served first before normal priority messages. The
+ * method should be used for critical messages such as hello, echo reply
+ * etc. The specified XID is inserted into the message.
+ *
+ * @param msg The OF message to be sent
+ * @return The XID used
+ */
+ @Override
+ public Integer asyncFastSend(OFMessage msg, int xid) {
+ msg.setXid(xid);
+ if (transmitQ != null) {
+ transmitQ.add(new PriorityMessage(msg, 1));
+ }
return xid;
}
- public void resumeSend() {
- synchronized (outBuffer) {
- try {
- outBuffer.flip();
- socket.write(outBuffer);
- outBuffer.compact();
- if (outBuffer.position() > 0) {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_WRITE, this);
- } else {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_READ, this);
- }
- } catch (Exception e) {
- reportError(e);
- }
- }
+ public void resumeSend() {
+ try {
+ if (msgReadWriteService != null) {
+ msgReadWriteService.resumeSend();
+ }
+ } catch (Exception e) {
+ reportError(e);
+ }
}
public void handleMessages() {
- List<OFMessage> msgs = readMessages();
+ List<OFMessage> msgs = null;
+
+ try {
+ msgs = msgReadWriteService.readMessages();
+ } catch (Exception e) {
+ reportError(e);
+ }
+
if (msgs == null) {
- logger.debug(toString() + " is down");
+ logger.debug("{} is down", toString());
// the connection is down, inform core
reportSwitchStateChange(false);
return;
}
for (OFMessage msg : msgs) {
- logger.trace("Message received: " + msg.toString());
+ logger.trace("Message received: {}", msg.toString());
/*
if ((msg.getType() != OFType.ECHO_REQUEST) &&
(msg.getType() != OFType.ECHO_REPLY)) {
// send feature request
OFMessage featureRequest = factory
.getMessage(OFType.FEATURES_REQUEST);
- asyncSend(featureRequest);
+ asyncFastSend(featureRequest);
// delete all pre-existing flows
OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
OFFlowMod flowMod = (OFFlowMod) factory
flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
.setOutPort(OFPort.OFPP_NONE).setLength(
(short) OFFlowMod.MINIMUM_LENGTH);
- asyncSend(flowMod);
+ asyncFastSend(flowMod);
this.state = SwitchState.WAIT_FEATURES_REPLY;
startSwitchTimer();
break;
case ECHO_REQUEST:
OFEchoReply echoReply = (OFEchoReply) factory
.getMessage(OFType.ECHO_REPLY);
- asyncSend(echoReply);
+ asyncFastSend(echoReply);
break;
case ECHO_REPLY:
this.probeSent = false;
}
- private List<OFMessage> readMessages() {
- List<OFMessage> msgs = null;
- int bytesRead;
- try {
- bytesRead = socket.read(inBuffer);
- } catch (Exception e) {
- reportError(e);
- return null;
- }
- if (bytesRead == -1) {
- return null;
- }
- inBuffer.flip();
- msgs = factory.parseMessages(inBuffer);
- if (inBuffer.hasRemaining()) {
- inBuffer.compact();
- } else {
- inBuffer.clear();
- }
- return msgs;
- }
-
private void startSwitchTimer() {
this.periodicTimer = new Timer();
this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
if (probeSent) {
// switch failed to respond to our probe, consider it down
- logger.warn(toString()
- + " is idle for too long, disconnect");
+ logger.warn("{} is idle for too long, disconnect", toString());
reportSwitchStateChange(false);
} else {
// send a probe to see if the switch is still alive
probeSent = true;
OFMessage echo = factory
.getMessage(OFType.ECHO_REQUEST);
- asyncSend(echo);
+ asyncFastSend(echo);
}
} else {
if (state == SwitchState.WAIT_FEATURES_REPLY) {
// send another features request
OFMessage request = factory
.getMessage(OFType.FEATURES_REQUEST);
- asyncSend(request);
+ asyncFastSend(request);
} else {
if (state == SwitchState.WAIT_CONFIG_REPLY) {
// send another config request
.getMessage(OFType.SET_CONFIG);
config.setMissSendLength((short) 0xffff)
.setLengthU(OFSetConfig.MINIMUM_LENGTH);
- asyncSend(config);
+ asyncFastSend(config);
OFMessage getConfig = factory
.getMessage(OFType.GET_CONFIG_REQUEST);
- asyncSend(getConfig);
+ asyncFastSend(getConfig);
}
}
}
}
private void reportError(Exception e) {
- //logger.error(toString() + " caught Error " + e.toString());
- // notify core of this error event
+ if (e instanceof AsynchronousCloseException ||
+ e instanceof InterruptedException ||
+ e instanceof SocketException) {
+ logger.debug("Caught exception {}", e.getMessage());
+ } else {
+ logger.warn("Caught exception {}", e.getMessage());
+ }
+ // notify core of this error event and disconnect the switch
((Controller) core).takeSwitchEventError(this);
}
.getMessage(OFType.SET_CONFIG);
config.setMissSendLength((short) 0xffff).setLengthU(
OFSetConfig.MINIMUM_LENGTH);
- asyncSend(config);
+ asyncFastSend(config);
// send config request to make sure the switch can handle the set config
OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
- asyncSend(getConfig);
+ asyncFastSend(getConfig);
this.state = SwitchState.WAIT_CONFIG_REPLY;
// inform core that a new switch is now operational
reportSwitchStateChange(true);
.get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for " + req.getType()
- + " replies");
+ logger.warn("Timeout while waiting for {} replies", req.getType());
result = null; // to indicate timeout has occurred
return result;
}
} else {
// if result is not null, this means the switch can't handle this message
// the result if OFError already
- logger.debug("Send " + msg.getType().toString()
- + " failed --> " + ((OFError) result).toString());
+ logger.debug("Send {} failed --> {}",
+ msg.getType().toString(), ((OFError) result).toString());
}
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for " + msg.getType().toString()
- + " reply");
+ logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
// convert the result into a Boolean with value false
status = false;
result = status;
worker.wakeup();
}
}
-
+
@Override
public Map<Short, OFPhysicalPort> getPhysicalPorts() {
return this.physicalPorts;
}
return result;
}
+
+ /*
+ * Transmit thread polls the message out of the priority queue and invokes
+ * messaging service to transmit it over the socket channel
+ */
+ class PriorityMessageTransmit implements Runnable {
+ public void run() {
+ running = true;
+ while (running) {
+ try {
+ if (!transmitQ.isEmpty()) {
+ PriorityMessage pmsg = transmitQ.poll();
+ msgReadWriteService.asyncSend(pmsg.msg);
+ logger.trace("Message sent: {}", pmsg.toString());
+ }
+ Thread.sleep(10);
+ } catch (InterruptedException ie) {
+ reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+ transmitQ = null;
+ }
+ }
+
+ /*
+ * Setup and start the transmit thread
+ */
+ private void startTransmitThread() {
+ this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
+ new Comparator<PriorityMessage>() {
+ public int compare(PriorityMessage p1, PriorityMessage p2) {
+ return p2.priority - p1.priority;
+ }
+ });
+ this.transmitThread = new Thread(new PriorityMessageTransmit());
+ this.transmitThread.start();
+ }
+
+ /*
+ * Setup communication services
+ */
+ private void setupCommChannel() throws Exception {
+ this.selector = SelectorProvider.provider().openSelector();
+ this.socket.configureBlocking(false);
+ this.socket.socket().setTcpNoDelay(true);
+ this.msgReadWriteService = getMessageReadWriteService();
+ }
+
+ private void sendFirstHello() {
+ try {
+ OFMessage msg = factory.getMessage(OFType.HELLO);
+ asyncFastSend(msg);
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+
+ private IMessageReadWrite getMessageReadWriteService() throws Exception {
+ String str = System.getProperty("secureChannelEnabled");
+ return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ?
+ new SecureMessageReadWriteService(socket, selector) :
+ new MessageReadWriteService(socket, selector);
+ }
}