X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FSwitchHandler.java;h=ea659c82e12929283d766a08e5eaa442b3e06b55;hp=5913ad0dd960036f3362c5e2325d0af5209529ab;hb=d4526d295217d6d6d60434e179a2a78c1b2eb52b;hpb=9cdfa8361e3b4d3e969821aa4de5c4862e22a025 diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 5913ad0dd9..ea659c82e1 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -9,6 +8,7 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; +import java.io.IOException; import java.net.SocketException; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; @@ -38,6 +38,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.IController; import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; import org.openflow.protocol.OFBarrierReply; +import org.openflow.protocol.OFBarrierRequest; import org.openflow.protocol.OFEchoReply; import org.openflow.protocol.OFError; import org.openflow.protocol.OFFeaturesReply; @@ -65,7 +66,7 @@ public class SwitchHandler implements ISwitch { private static final Logger logger = LoggerFactory .getLogger(SwitchHandler.class); private static final int SWITCH_LIVENESS_TIMER = 5000; - private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500; + private static final int switchLivenessTimeout = getSwitchLivenessTimeout(); private int MESSAGE_RESPONSE_TIMER = 2000; private String instanceName; @@ -93,9 +94,9 @@ public class SwitchHandler implements ISwitch { private IMessageReadWrite msgReadWriteService; private Thread switchHandlerThread; private Integer responseTimerValue; - private PriorityBlockingQueue transmitQ; + private PriorityBlockingQueue transmitQ; private Thread transmitThread; - + private enum SwitchState { NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL( 3); @@ -116,10 +117,10 @@ public class SwitchHandler implements ISwitch { this.instanceName = name; this.thisISwitch = this; this.sid = (long) 0; - this.buffers = (int)0; - this.capabilities = (int)0; - this.tables = (byte)0; - this.actions = (int)0; + this.buffers = (int) 0; + this.capabilities = (int) 0; + this.tables = (byte) 0; + this.actions = (int) 0; this.core = core; this.socket = sc; this.factory = new BasicFactory(); @@ -136,20 +137,21 @@ public class SwitchHandler implements ISwitch { this.responseTimerValue = MESSAGE_RESPONSE_TIMER; String rTimer = System.getProperty("of.messageResponseTimer"); if (rTimer != null) { - try { - responseTimerValue = Integer.decode(rTimer); - } catch (NumberFormatException e) { - logger.warn("Invalid of.messageResponseTimer: {} use default({})", - rTimer, MESSAGE_RESPONSE_TIMER); - } + try { + responseTimerValue = Integer.decode(rTimer); + } catch (NumberFormatException e) { + logger.warn( + "Invalid of.messageResponseTimer: {} use default({})", + rTimer, MESSAGE_RESPONSE_TIMER); + } } - } + } public void start() { try { - startTransmitThread(); - setupCommChannel(); - sendFirstHello(); + startTransmitThread(); + setupCommChannel(); + sendFirstHello(); startHandlerThread(); } catch (Exception e) { reportError(e); @@ -163,7 +165,7 @@ public class SwitchHandler implements ISwitch { running = true; while (running) { try { - // wait for an incoming connection + // wait for an incoming connection selector.select(0); Iterator selectedKeys = selector .selectedKeys().iterator(); @@ -178,7 +180,7 @@ public class SwitchHandler implements ISwitch { } } } catch (Exception e) { - reportError(e); + reportError(e); } } } @@ -187,33 +189,32 @@ public class SwitchHandler implements ISwitch { } public void stop() { - running = false; - cancelSwitchTimer(); - try { - selector.wakeup(); - selector.close(); - } catch (Exception e) { - } - try { - socket.close(); - } catch (Exception e) { - } - try { - msgReadWriteService.stop(); - } catch (Exception e) { - } - executor.shutdown(); - - selector = null; - socket = null; - msgReadWriteService = null; - - if (switchHandlerThread != null) { - switchHandlerThread.interrupt(); - } - if (transmitThread != null) { - transmitThread.interrupt(); - } + running = false; + cancelSwitchTimer(); + try { + selector.wakeup(); + selector.close(); + } catch (Exception e) { + } + try { + socket.close(); + } catch (Exception e) { + } + try { + msgReadWriteService.stop(); + } catch (Exception e) { + } + executor.shutdown(); + + selector = null; + msgReadWriteService = null; + + if (switchHandlerThread != null) { + switchHandlerThread.interrupt(); + } + if (transmitThread != null) { + transmitThread.interrupt(); + } } @Override @@ -221,92 +222,132 @@ public class SwitchHandler implements ISwitch { return this.xid.incrementAndGet(); } - /** - * This method puts the message in an outgoing priority queue with normal - * priority. It will be served after high priority messages. The method - * should be used for non-critical messages such as statistics request, - * discovery packets, etc. An unique XID is generated automatically and - * inserted into the message. - * - * @param msg The OF message to be sent - * @return The XID used - */ + /** + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. An unique XID is generated automatically and + * inserted into the message. + * + * @param msg + * The OF message to be sent + * @return The XID used + */ @Override public Integer asyncSend(OFMessage msg) { - return asyncSend(msg, getNextXid()); - } - - /** - * This method puts the message in an outgoing priority queue with normal - * priority. It will be served after high priority messages. The method - * should be used for non-critical messages such as statistics request, - * discovery packets, etc. The specified XID is inserted into the message. - * - * @param msg The OF message to be Sent - * @param xid The XID to be used in the message - * @return The XID used - */ + return asyncSend(msg, getNextXid()); + } + + private Object syncSend(OFMessage msg, int xid) { + SynchronousMessage worker = new SynchronousMessage(this, xid, msg); + messageWaitingDone.put(xid, worker); + Object result = null; + Boolean status = false; + Future submit = executor.submit(worker); + try { + result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); + messageWaitingDone.remove(xid); + if (result == null) { + // if result is null, then it means the switch can handle this + // message successfully + // convert the result into a Boolean with value true + status = true; + // logger.debug("Successfully send " + + // msg.getType().toString()); + result = status; + } else { + // if result is not null, this means the switch can't handle + // this message + // the result if OFError already + logger.debug("Send {} failed --> {}", msg.getType().toString(), + ((OFError) result).toString()); + } + return result; + } catch (Exception e) { + logger.warn("Timeout while waiting for {} reply", msg.getType() + .toString()); + // convert the result into a Boolean with value false + status = false; + result = status; + return result; + } + } + + /** + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. The specified XID is inserted into the message. + * + * @param msg + * The OF message to be Sent + * @param xid + * The XID to be used in the message + * @return The XID used + */ @Override public Integer asyncSend(OFMessage msg, int xid) { - msg.setXid(xid); - if (transmitQ != null) { - transmitQ.add(new PriorityMessage(msg, 0)); - } + msg.setXid(xid); + if (transmitQ != null) { + transmitQ.add(new PriorityMessage(msg, 0)); + } return xid; } - /** - * This method puts the message in an outgoing priority queue with high - * priority. It will be served first before normal priority messages. The - * method should be used for critical messages such as hello, echo reply - * etc. An unique XID is generated automatically and inserted into the - * message. - * - * @param msg The OF message to be sent - * @return The XID used - */ + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. An unique XID is generated automatically and inserted into the + * message. + * + * @param msg + * The OF message to be sent + * @return The XID used + */ @Override public Integer asyncFastSend(OFMessage msg) { - return asyncFastSend(msg, getNextXid()); - } - - /** - * This method puts the message in an outgoing priority queue with high - * priority. It will be served first before normal priority messages. The - * method should be used for critical messages such as hello, echo reply - * etc. The specified XID is inserted into the message. - * - * @param msg The OF message to be sent - * @return The XID used - */ + return asyncFastSend(msg, getNextXid()); + } + + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. The specified XID is inserted into the message. + * + * @param msg + * The OF message to be sent + * @return The XID used + */ @Override public Integer asyncFastSend(OFMessage msg, int xid) { - msg.setXid(xid); - if (transmitQ != null) { - transmitQ.add(new PriorityMessage(msg, 1)); - } + msg.setXid(xid); + if (transmitQ != null) { + transmitQ.add(new PriorityMessage(msg, 1)); + } return xid; } - public void resumeSend() { + public void resumeSend() { try { - if (msgReadWriteService != null) { - msgReadWriteService.resumeSend(); - } - } catch (Exception e) { - reportError(e); - } + if (msgReadWriteService != null) { + msgReadWriteService.resumeSend(); + } + } catch (Exception e) { + reportError(e); + } } public void handleMessages() { List msgs = null; - + try { - msgs = msgReadWriteService.readMessages(); - } catch (Exception e) { - reportError(e); - } - + msgs = msgReadWriteService.readMessages(); + } catch (Exception e) { + reportError(e); + } + if (msgs == null) { logger.debug("{} is down", toString()); // the connection is down, inform core @@ -315,12 +356,6 @@ public class SwitchHandler implements ISwitch { } for (OFMessage msg : msgs) { logger.trace("Message received: {}", msg.toString()); - /* - if ((msg.getType() != OFType.ECHO_REQUEST) && - (msg.getType() != OFType.ECHO_REPLY)) { - logger.debug(msg.getType().toString() + " received from sw " + toString()); - } - */ this.lastMsgReceivedTimeStamp = System.currentTimeMillis(); OFType type = msg.getType(); switch (type) { @@ -334,8 +369,8 @@ public class SwitchHandler implements ISwitch { OFFlowMod flowMod = (OFFlowMod) factory .getMessage(OFType.FLOW_MOD); flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE) - .setOutPort(OFPort.OFPP_NONE).setLength( - (short) OFFlowMod.MINIMUM_LENGTH); + .setOutPort(OFPort.OFPP_NONE) + .setLength((short) OFFlowMod.MINIMUM_LENGTH); asyncFastSend(flowMod); this.state = SwitchState.WAIT_FEATURES_REPLY; startSwitchTimer(); @@ -352,7 +387,8 @@ public class SwitchHandler implements ISwitch { processFeaturesReply((OFFeaturesReply) msg); break; case GET_CONFIG_REPLY: - // make sure that the switch can send the whole packet to the controller + // make sure that the switch can send the whole packet to the + // controller if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) { this.state = SwitchState.OPERATIONAL; } @@ -381,18 +417,14 @@ public class SwitchHandler implements ISwitch { } private void processPortStatusMsg(OFPortStatus msg) { - //short portNumber = msg.getDesc().getPortNumber(); OFPhysicalPort port = msg.getDesc(); if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) { updatePhysicalPort(port); - //logger.debug("Port " + portNumber + " on " + toString() + " modified"); } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) { updatePhysicalPort(port); - //logger.debug("Port " + portNumber + " on " + toString() + " added"); } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE .ordinal()) { deletePhysicalPort(port); - //logger.debug("Port " + portNumber + " on " + toString() + " deleted"); } } @@ -404,14 +436,18 @@ public class SwitchHandler implements ISwitch { public void run() { try { Long now = System.currentTimeMillis(); - if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) { + if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) { if (probeSent) { - // switch failed to respond to our probe, consider it down - logger.warn("{} is idle for too long, disconnect", toString()); + // switch failed to respond to our probe, consider + // it down + logger.warn("{} is idle for too long, disconnect", + toString()); reportSwitchStateChange(false); } else { // send a probe to see if the switch is still alive - //logger.debug("Send idle probe (Echo Request) to " + switchName()); + logger.debug( + "Send idle probe (Echo Request) to {}", + toString()); probeSent = true; OFMessage echo = factory .getMessage(OFType.ECHO_REQUEST); @@ -425,7 +461,7 @@ public class SwitchHandler implements ISwitch { asyncFastSend(request); } else { if (state == SwitchState.WAIT_CONFIG_REPLY) { - // send another config request + // send another config request OFSetConfig config = (OFSetConfig) factory .getMessage(OFType.SET_CONFIG); config.setMissSendLength((short) 0xffff) @@ -451,20 +487,20 @@ public class SwitchHandler implements ISwitch { } private void reportError(Exception e) { - if (e instanceof AsynchronousCloseException || - e instanceof InterruptedException || - e instanceof SocketException) { - logger.debug("Caught exception {}", e.getMessage()); - } else { - logger.warn("Caught exception {}", e.getMessage()); - } + if (e instanceof AsynchronousCloseException + || e instanceof InterruptedException + || e instanceof SocketException || e instanceof IOException) { + logger.debug("Caught exception {}", e.getMessage()); + } else { + logger.warn("Caught exception ", e); + } // notify core of this error event and disconnect the switch ((Controller) core).takeSwitchEventError(this); } private void reportSwitchStateChange(boolean added) { if (added) { - ((Controller) core).takeSwtichEventAdd(this); + ((Controller) core).takeSwitchEventAdd(this); } else { ((Controller) core).takeSwitchEventDelete(this); } @@ -492,7 +528,8 @@ public class SwitchHandler implements ISwitch { config.setMissSendLength((short) 0xffff).setLengthU( OFSetConfig.MINIMUM_LENGTH); asyncFastSend(config); - // send config request to make sure the switch can handle the set config + // send config request to make sure the switch can handle the set + // config OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST); asyncFastSend(getConfig); this.state = SwitchState.WAIT_CONFIG_REPLY; @@ -505,8 +542,7 @@ public class SwitchHandler implements ISwitch { Short portNumber = port.getPortNumber(); physicalPorts.put(portNumber, port); portBandwidth - .put( - portNumber, + .put(portNumber, port.getCurrentFeatures() & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD @@ -519,7 +555,7 @@ public class SwitchHandler implements ISwitch { .getValue() | OFPortFeatures.OFPPF_1GB_HD .getValue() | OFPortFeatures.OFPPF_10GB_FD - .getValue())); + .getValue())); } private void deletePhysicalPort(OFPhysicalPort port) { @@ -535,11 +571,16 @@ public class SwitchHandler implements ISwitch { @Override public String toString() { - return ("[" - + this.socket.toString() - + " SWID " - + (isOperational() ? HexString.toHexString(this.sid) - : "unkbown") + "]"); + try { + return ("Switch:" + + socket.getRemoteAddress().toString().split("/")[1] + + " SWID:" + (isOperational() ? HexString + .toHexString(this.sid) : "unknown")); + } catch (Exception e) { + return (isOperational() ? HexString.toHexString(this.sid) + : "unknown"); + } + } @Override @@ -559,8 +600,7 @@ public class SwitchHandler implements ISwitch { Future submit = executor.submit(worker); Object result = null; try { - result = submit - .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS); + result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); return result; } catch (Exception e) { logger.warn("Timeout while waiting for {} replies", req.getType()); @@ -571,41 +611,13 @@ public class SwitchHandler implements ISwitch { @Override public Object syncSend(OFMessage msg) { - Integer xid = getNextXid(); - SynchronousMessage worker = new SynchronousMessage(this, xid, msg); - messageWaitingDone.put(xid, worker); - Object result = null; - Boolean status = false; - Future submit = executor.submit(worker); - try { - result = submit - .get(responseTimerValue, TimeUnit.MILLISECONDS); - messageWaitingDone.remove(xid); - if (result == null) { - // if result is null, then it means the switch can handle this message successfully - // convert the result into a Boolean with value true - status = true; - //logger.debug("Successfully send " + msg.getType().toString()); - result = status; - } else { - // if result is not null, this means the switch can't handle this message - // the result if OFError already - logger.debug("Send {} failed --> {}", - msg.getType().toString(), ((OFError) result).toString()); - } - return result; - } catch (Exception e) { - logger.warn("Timeout while waiting for {} reply", msg.getType().toString()); - // convert the result into a Boolean with value false - status = false; - result = status; - return result; - } + int xid = getNextXid(); + return syncSend(msg, xid); } /* - * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message, - * wake up associated task so that it can continue + * Either a BarrierReply or a OFError is received. If this is a reply for an + * outstanding sync message, wake up associated task so that it can continue */ private void processBarrierReply(OFBarrierReply msg) { Integer xid = msg.getXid(); @@ -626,7 +638,8 @@ public class SwitchHandler implements ISwitch { xid = errorMsg.getXid(); } /* - * the error can be a reply to a synchronous message or to a statistic request message + * the error can be a reply to a synchronous message or to a statistic + * request message */ Callable worker = messageWaitingDone.remove(xid); if (worker == null) { @@ -653,7 +666,7 @@ public class SwitchHandler implements ISwitch { worker.wakeup(); } } - + @Override public Map getPhysicalPorts() { return this.physicalPorts; @@ -678,12 +691,12 @@ public class SwitchHandler implements ISwitch { public Byte getTables() { return this.tables; } - + @Override public Integer getActions() { return this.actions; } - + @Override public Integer getCapabilities() { return this.capabilities; @@ -732,68 +745,103 @@ public class SwitchHandler implements ISwitch { return result; } - /* - * Transmit thread polls the message out of the priority queue and invokes - * messaging service to transmit it over the socket channel - */ + /* + * Transmit thread polls the message out of the priority queue and invokes + * messaging service to transmit it over the socket channel + */ class PriorityMessageTransmit implements Runnable { public void run() { running = true; while (running) { - try { - if (!transmitQ.isEmpty()) { - PriorityMessage pmsg = transmitQ.poll(); - msgReadWriteService.asyncSend(pmsg.msg); - logger.trace("Message sent: {}", pmsg.toString()); - } - Thread.sleep(10); - } catch (InterruptedException ie) { - reportError(new InterruptedException("PriorityMessageTransmit thread interrupted")); - } catch (Exception e) { - reportError(e); - } + try { + if (!transmitQ.isEmpty()) { + PriorityMessage pmsg = transmitQ.poll(); + msgReadWriteService.asyncSend(pmsg.msg); + logger.trace("Message sent: {}", pmsg.toString()); + } + Thread.sleep(10); + } catch (InterruptedException ie) { + reportError(new InterruptedException( + "PriorityMessageTransmit thread interrupted")); + } catch (Exception e) { + reportError(e); + } } - transmitQ = null; + transmitQ = null; } } /* * Setup and start the transmit thread */ - private void startTransmitThread() { - this.transmitQ = new PriorityBlockingQueue(11, - new Comparator() { - public int compare(PriorityMessage p1, PriorityMessage p2) { - return p2.priority - p1.priority; - } - }); + private void startTransmitThread() { + this.transmitQ = new PriorityBlockingQueue(11, + new Comparator() { + public int compare(PriorityMessage p1, PriorityMessage p2) { + if (p2.priority != p1.priority) { + return p2.priority - p1.priority; + } else { + return (p2.seqNum < p1.seqNum) ? 1 : -1; + } + } + }); this.transmitThread = new Thread(new PriorityMessageTransmit()); this.transmitThread.start(); } - + /* * Setup communication services */ private void setupCommChannel() throws Exception { this.selector = SelectorProvider.provider().openSelector(); this.socket.configureBlocking(false); - this.socket.socket().setTcpNoDelay(true); + this.socket.socket().setTcpNoDelay(true); this.msgReadWriteService = getMessageReadWriteService(); } private void sendFirstHello() { - try { - OFMessage msg = factory.getMessage(OFType.HELLO); - asyncFastSend(msg); - } catch (Exception e) { - reportError(e); - } - } - + try { + OFMessage msg = factory.getMessage(OFType.HELLO); + asyncFastSend(msg); + } catch (Exception e) { + reportError(e); + } + } + private IMessageReadWrite getMessageReadWriteService() throws Exception { - String str = System.getProperty("secureChannelEnabled"); - return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? - new SecureMessageReadWriteService(socket, selector) : - new MessageReadWriteService(socket, selector); + String str = System.getProperty("secureChannelEnabled"); + return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService( + socket, selector) : new MessageReadWriteService(socket, + selector); + } + + /** + * Sends synchronous Barrier message + */ + @Override + public Object sendBarrierMessage() { + OFBarrierRequest barrierMsg = new OFBarrierRequest(); + return syncSend(barrierMsg); + } + + /** + * This method returns the switch liveness timeout value. If controller did + * not receive any message from the switch for such a long period, + * controller will tear down the connection to the switch. + * + * @return The timeout value + */ + private static int getSwitchLivenessTimeout() { + String timeout = System.getProperty("of.switchLivenessTimeout"); + int rv = 60500; + + try { + if (timeout != null) { + rv = Integer.parseInt(timeout); + } + } catch (Exception e) { + } + + return rv; } }