X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FSwitchHandler.java;h=45203758bdd45e13486936faab73c27f947f547d;hp=8881fb53640caa58e692a08bdbdbb7bbb80916e6;hb=f2344025e13ee3e51561bb171800d240f9f91e9a;hpb=48c79f7c49ed86f40970a30f85ccb096370d636e diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 8881fb5364..45203758bd 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -9,12 +9,13 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; -import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.ArrayList; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -28,11 +29,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.protocol_plugin.openflow.core.IController; import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; +import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; import org.openflow.protocol.OFBarrierReply; import org.openflow.protocol.OFEchoReply; import org.openflow.protocol.OFError; @@ -63,7 +66,6 @@ public class SwitchHandler implements ISwitch { private static final int SWITCH_LIVENESS_TIMER = 5000; private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500; private int MESSAGE_RESPONSE_TIMER = 2000; - private static final int bufferSize = 1024 * 1024; private String instanceName; private ISwitch thisISwitch; @@ -74,10 +76,7 @@ public class SwitchHandler implements ISwitch { private Byte tables; private Integer actions; private Selector selector; - private SelectionKey clientSelectionKey; private SocketChannel socket; - private ByteBuffer inBuffer; - private ByteBuffer outBuffer; private BasicFactory factory; private AtomicInteger xid; private SwitchState state; @@ -90,9 +89,12 @@ public class SwitchHandler implements ISwitch { private ExecutorService executor; private ConcurrentHashMap> messageWaitingDone; private boolean running; + private IMessageReadWrite msgReadWriteService; private Thread switchHandlerThread; private Integer responseTimerValue; - + private PriorityBlockingQueue transmitQ; + private Thread transmitThread; + private enum SwitchState { NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL( 3); @@ -130,44 +132,37 @@ public class SwitchHandler implements ISwitch { this.periodicTimer = null; this.executor = Executors.newFixedThreadPool(4); this.messageWaitingDone = new ConcurrentHashMap>(); - this.inBuffer = ByteBuffer.allocateDirect(bufferSize); - this.outBuffer = ByteBuffer.allocateDirect(bufferSize); this.responseTimerValue = MESSAGE_RESPONSE_TIMER; String rTimer = System.getProperty("of.messageResponseTimer"); if (rTimer != null) { - try { - responseTimerValue = Integer.decode(rTimer); - } catch (NumberFormatException e) { - logger.warn("Invalid of.messageResponseTimer:" + rTimer + ", use default(" - + MESSAGE_RESPONSE_TIMER+ ")"); - } + try { + responseTimerValue = Integer.decode(rTimer); + } catch (NumberFormatException e) { + logger.warn("Invalid of.messageResponseTimer: {} use default({})", + rTimer, MESSAGE_RESPONSE_TIMER); + } } - } + } public void start() { try { - this.selector = SelectorProvider.provider().openSelector(); - this.socket.configureBlocking(false); - this.socket.socket().setTcpNoDelay(true); - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_READ); + startTransmitThread(); + setupCommChannel(); + sendFirstHello(); startHandlerThread(); } catch (Exception e) { reportError(e); - return; } } private void startHandlerThread() { - OFMessage msg = factory.getMessage(OFType.HELLO); - asyncSend(msg); switchHandlerThread = new Thread(new Runnable() { @Override public void run() { running = true; while (running) { try { - // wait for an incoming connection + // wait for an incoming connection selector.select(0); Iterator selectedKeys = selector .selectedKeys().iterator(); @@ -195,7 +190,7 @@ public class SwitchHandler implements ISwitch { running = false; selector.wakeup(); cancelSwitchTimer(); - this.clientSelectionKey.cancel(); + this.selector.close(); this.socket.close(); executor.shutdown(); } catch (Exception e) { @@ -209,77 +204,94 @@ public class SwitchHandler implements ISwitch { return this.xid.incrementAndGet(); } + /** + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. An unique XID is generated automatically and + * inserted into the message. + * + * @param msg The OF message to be sent + * @return The XID used + */ @Override public Integer asyncSend(OFMessage msg) { - return asyncSend(msg, getNextXid()); + return asyncSend(msg, getNextXid()); + } + + /** + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. The specified XID is inserted into the message. + * + * @param msg The OF message to be Sent + * @param xid The XID to be used in the message + * @return The XID used + */ + @Override + public Integer asyncSend(OFMessage msg, int xid) { + msg.setXid(xid); + transmitQ.add(new PriorityMessage(msg, 0)); + return xid; } + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. An unique XID is generated automatically and inserted into the + * message. + * + * @param msg The OF message to be sent + * @return The XID used + */ @Override - public Integer asyncSend(OFMessage msg, int xid) { - synchronized (outBuffer) { - /* - if ((msg.getType() != OFType.ECHO_REQUEST) && - (msg.getType() != OFType.ECHO_REPLY)) { - logger.debug("sending " + msg.getType().toString() + " to " + toString()); - } - */ - msg.setXid(xid); - int msgLen = msg.getLengthU(); - if (outBuffer.remaining() < msgLen) { - // increase the buffer size so that it can contain this message - ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer - .capacity() - + msgLen); - outBuffer.flip(); - newBuffer.put(outBuffer); - outBuffer = newBuffer; - } - msg.writeTo(outBuffer); - outBuffer.flip(); - try { - socket.write(outBuffer); - outBuffer.compact(); - if (outBuffer.position() > 0) { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_WRITE, this); - } - logger.trace("Message sent: " + msg.toString()); - } catch (Exception e) { - reportError(e); - } - } + public Integer asyncFastSend(OFMessage msg) { + return asyncFastSend(msg, getNextXid()); + } + + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. The specified XID is inserted into the message. + * + * @param msg The OF message to be sent + * @return The XID used + */ + @Override + public Integer asyncFastSend(OFMessage msg, int xid) { + msg.setXid(xid); + transmitQ.add(new PriorityMessage(msg, 1)); return xid; } - public void resumeSend() { - synchronized (outBuffer) { - try { - outBuffer.flip(); - socket.write(outBuffer); - outBuffer.compact(); - if (outBuffer.position() > 0) { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_WRITE, this); - } else { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_READ, this); - } - } catch (Exception e) { - reportError(e); - } - } + public void resumeSend() { + try { + msgReadWriteService.resumeSend(); + } catch (Exception e) { + reportError(e); + } } public void handleMessages() { - List msgs = readMessages(); + List msgs = null; + + try { + msgs = msgReadWriteService.readMessages(); + } catch (Exception e) { + reportError(e); + } + if (msgs == null) { - logger.debug(toString() + " is down"); + logger.debug("{} is down", toString()); // the connection is down, inform core reportSwitchStateChange(false); return; } for (OFMessage msg : msgs) { - logger.trace("Message received: " + msg.toString()); + logger.trace("Message received: {}", msg.toString()); /* if ((msg.getType() != OFType.ECHO_REQUEST) && (msg.getType() != OFType.ECHO_REPLY)) { @@ -293,7 +305,7 @@ public class SwitchHandler implements ISwitch { // send feature request OFMessage featureRequest = factory .getMessage(OFType.FEATURES_REQUEST); - asyncSend(featureRequest); + asyncFastSend(featureRequest); // delete all pre-existing flows OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL); OFFlowMod flowMod = (OFFlowMod) factory @@ -301,14 +313,14 @@ public class SwitchHandler implements ISwitch { flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE) .setOutPort(OFPort.OFPP_NONE).setLength( (short) OFFlowMod.MINIMUM_LENGTH); - asyncSend(flowMod); + asyncFastSend(flowMod); this.state = SwitchState.WAIT_FEATURES_REPLY; startSwitchTimer(); break; case ECHO_REQUEST: OFEchoReply echoReply = (OFEchoReply) factory .getMessage(OFType.ECHO_REPLY); - asyncSend(echoReply); + asyncFastSend(echoReply); break; case ECHO_REPLY: this.probeSent = false; @@ -362,28 +374,6 @@ public class SwitchHandler implements ISwitch { } - private List readMessages() { - List msgs = null; - int bytesRead; - try { - bytesRead = socket.read(inBuffer); - } catch (Exception e) { - reportError(e); - return null; - } - if (bytesRead == -1) { - return null; - } - inBuffer.flip(); - msgs = factory.parseMessages(inBuffer); - if (inBuffer.hasRemaining()) { - inBuffer.compact(); - } else { - inBuffer.clear(); - } - return msgs; - } - private void startSwitchTimer() { this.periodicTimer = new Timer(); this.periodicTimer.scheduleAtFixedRate(new TimerTask() { @@ -394,8 +384,7 @@ public class SwitchHandler implements ISwitch { if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) { if (probeSent) { // switch failed to respond to our probe, consider it down - logger.warn(toString() - + " is idle for too long, disconnect"); + logger.warn("{} is idle for too long, disconnect", toString()); reportSwitchStateChange(false); } else { // send a probe to see if the switch is still alive @@ -403,14 +392,14 @@ public class SwitchHandler implements ISwitch { probeSent = true; OFMessage echo = factory .getMessage(OFType.ECHO_REQUEST); - asyncSend(echo); + asyncFastSend(echo); } } else { if (state == SwitchState.WAIT_FEATURES_REPLY) { // send another features request OFMessage request = factory .getMessage(OFType.FEATURES_REQUEST); - asyncSend(request); + asyncFastSend(request); } else { if (state == SwitchState.WAIT_CONFIG_REPLY) { // send another config request @@ -418,10 +407,10 @@ public class SwitchHandler implements ISwitch { .getMessage(OFType.SET_CONFIG); config.setMissSendLength((short) 0xffff) .setLengthU(OFSetConfig.MINIMUM_LENGTH); - asyncSend(config); + asyncFastSend(config); OFMessage getConfig = factory .getMessage(OFType.GET_CONFIG_REQUEST); - asyncSend(getConfig); + asyncFastSend(getConfig); } } } @@ -439,8 +428,8 @@ public class SwitchHandler implements ISwitch { } private void reportError(Exception e) { - //logger.error(toString() + " caught Error " + e.toString()); - // notify core of this error event + logger.debug("Caught exception ", e); + // notify core of this error event and disconnect the switch ((Controller) core).takeSwitchEventError(this); } @@ -473,10 +462,10 @@ public class SwitchHandler implements ISwitch { .getMessage(OFType.SET_CONFIG); config.setMissSendLength((short) 0xffff).setLengthU( OFSetConfig.MINIMUM_LENGTH); - asyncSend(config); + asyncFastSend(config); // send config request to make sure the switch can handle the set config OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST); - asyncSend(getConfig); + asyncFastSend(getConfig); this.state = SwitchState.WAIT_CONFIG_REPLY; // inform core that a new switch is now operational reportSwitchStateChange(true); @@ -545,8 +534,7 @@ public class SwitchHandler implements ISwitch { .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS); return result; } catch (Exception e) { - logger.warn("Timeout while waiting for " + req.getType() - + " replies"); + logger.warn("Timeout while waiting for {} replies", req.getType()); result = null; // to indicate timeout has occurred return result; } @@ -573,13 +561,12 @@ public class SwitchHandler implements ISwitch { } else { // if result is not null, this means the switch can't handle this message // the result if OFError already - logger.debug("Send " + msg.getType().toString() - + " failed --> " + ((OFError) result).toString()); + logger.debug("Send {} failed --> {}", + msg.getType().toString(), ((OFError) result).toString()); } return result; } catch (Exception e) { - logger.warn("Timeout while waiting for " + msg.getType().toString() - + " reply"); + logger.warn("Timeout while waiting for {} reply", msg.getType().toString()); // convert the result into a Boolean with value false status = false; result = status; @@ -637,7 +624,7 @@ public class SwitchHandler implements ISwitch { worker.wakeup(); } } - + @Override public Map getPhysicalPorts() { return this.physicalPorts; @@ -715,4 +702,65 @@ public class SwitchHandler implements ISwitch { } return result; } + + /* + * Transmit thread polls the message out of the priority queue and invokes + * messaging service to transmit it over the socket channel + */ + class PriorityMessageTransmit implements Runnable { + public void run() { + while (true) { + try { + if (!transmitQ.isEmpty()) { + PriorityMessage pmsg = transmitQ.poll(); + msgReadWriteService.asyncSend(pmsg.msg); + logger.trace("Message sent: {}", pmsg.toString()); + } + Thread.sleep(10); + } catch (Exception e) { + reportError(e); + } + } + } + } + + /* + * Setup and start the transmit thread + */ + private void startTransmitThread() { + this.transmitQ = new PriorityBlockingQueue(11, + new Comparator() { + public int compare(PriorityMessage p1, PriorityMessage p2) { + return p2.priority - p1.priority; + } + }); + this.transmitThread = new Thread(new PriorityMessageTransmit()); + this.transmitThread.start(); + } + + /* + * Setup communication services + */ + private void setupCommChannel() throws Exception { + this.selector = SelectorProvider.provider().openSelector(); + this.socket.configureBlocking(false); + this.socket.socket().setTcpNoDelay(true); + this.msgReadWriteService = getMessageReadWriteService(); + } + + private void sendFirstHello() { + try { + OFMessage msg = factory.getMessage(OFType.HELLO); + asyncFastSend(msg); + } catch (Exception e) { + reportError(e); + } + } + + private IMessageReadWrite getMessageReadWriteService() throws Exception { + String str = System.getProperty("secureChannelEnabled"); + return ((str != null) && (str.equalsIgnoreCase("true"))) ? + new SecureMessageReadWriteService(socket, selector) : + new MessageReadWriteService(socket, selector); + } }