X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FSwitchHandler.java;h=91909d20f53a5bde5adea2b7020c38a78406f080;hp=57098ae3c6fe2f61429d72a7f10aac9cab5d8de6;hb=c2dfbc0108cac537083b1f85e5970f95810bb4c2;hpb=40baffab6be9315bd15c3d59dfd6b8e6685e659a diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 57098ae3c6..91909d20f5 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -8,9 +8,6 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; -import java.io.IOException; -import java.net.SocketException; -import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -31,12 +28,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.protocol_plugin.openflow.core.IController; -import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; +import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.openflow.protocol.OFBarrierReply; import org.openflow.protocol.OFBarrierRequest; import org.openflow.protocol.OFEchoReply; @@ -63,33 +61,32 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SwitchHandler implements ISwitch { - private static final Logger logger = LoggerFactory - .getLogger(SwitchHandler.class); + private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class); private static final int SWITCH_LIVENESS_TIMER = 5000; private static final int switchLivenessTimeout = getSwitchLivenessTimeout(); - private int MESSAGE_RESPONSE_TIMER = 2000; + private final int MESSAGE_RESPONSE_TIMER = 2000; - private String instanceName; - private ISwitch thisISwitch; - private IController core; + private final String instanceName; + private final ISwitch thisISwitch; + private final IController core; private Long sid; private Integer buffers; private Integer capabilities; private Byte tables; private Integer actions; private Selector selector; - private SocketChannel socket; - private BasicFactory factory; - private AtomicInteger xid; + private final SocketChannel socket; + private final BasicFactory factory; + private final AtomicInteger xid; private SwitchState state; private Timer periodicTimer; - private Map physicalPorts; - private Map portBandwidth; - private Date connectedDate; + private final Map physicalPorts; + private final Map portBandwidth; + private final Date connectedDate; private Long lastMsgReceivedTimeStamp; private Boolean probeSent; - private ExecutorService executor; - private ConcurrentHashMap> messageWaitingDone; + private final ExecutorService executor; + private final ConcurrentHashMap> messageWaitingDone; private boolean running; private IMessageReadWrite msgReadWriteService; private Thread switchHandlerThread; @@ -98,8 +95,7 @@ public class SwitchHandler implements ISwitch { private Thread transmitThread; private enum SwitchState { - NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL( - 3); + NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3); private int value; @@ -140,9 +136,7 @@ public class SwitchHandler implements ISwitch { try { responseTimerValue = Integer.decode(rTimer); } catch (NumberFormatException e) { - logger.warn( - "Invalid of.messageResponseTimer: {} use default({})", - rTimer, MESSAGE_RESPONSE_TIMER); + logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER); } } } @@ -167,8 +161,7 @@ public class SwitchHandler implements ISwitch { try { // wait for an incoming connection selector.select(0); - Iterator selectedKeys = selector - .selectedKeys().iterator(); + Iterator selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey skey = selectedKeys.next(); selectedKeys.remove(); @@ -188,7 +181,9 @@ public class SwitchHandler implements ISwitch { switchHandlerThread.start(); } - public void stop() { + private void stopInternal() { + logger.debug("{} receives stop signal", + (isOperational() ? HexString.toHexString(sid) : "unknown")); running = false; cancelSwitchTimer(); try { @@ -204,10 +199,14 @@ public class SwitchHandler implements ISwitch { msgReadWriteService.stop(); } catch (Exception e) { } - executor.shutdown(); + logger.debug("executor shutdown now"); + executor.shutdownNow(); - selector = null; msgReadWriteService = null; + } + + public void stop() { + stopInternal(); if (switchHandlerThread != null) { switchHandlerThread.interrupt(); @@ -228,7 +227,7 @@ public class SwitchHandler implements ISwitch { * should be used for non-critical messages such as statistics request, * discovery packets, etc. An unique XID is generated automatically and * inserted into the message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -239,38 +238,7 @@ public class SwitchHandler implements ISwitch { } private Object syncSend(OFMessage msg, int xid) { - SynchronousMessage worker = new SynchronousMessage(this, xid, msg); - messageWaitingDone.put(xid, worker); - Object result = null; - Boolean status = false; - Future submit = executor.submit(worker); - try { - result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); - messageWaitingDone.remove(xid); - if (result == null) { - // if result is null, then it means the switch can handle this - // message successfully - // convert the result into a Boolean with value true - status = true; - // logger.debug("Successfully send " + - // msg.getType().toString()); - result = status; - } else { - // if result is not null, this means the switch can't handle - // this message - // the result if OFError already - logger.debug("Send {} failed --> {}", msg.getType().toString(), - ((OFError) result).toString()); - } - return result; - } catch (Exception e) { - logger.warn("Timeout while waiting for {} reply", msg.getType() - .toString()); - // convert the result into a Boolean with value false - status = false; - result = status; - return result; - } + return syncMessageInternal(msg, xid, true); } /** @@ -278,7 +246,7 @@ public class SwitchHandler implements ISwitch { * priority. It will be served after high priority messages. The method * should be used for non-critical messages such as statistics request, * discovery packets, etc. The specified XID is inserted into the message. - * + * * @param msg * The OF message to be Sent * @param xid @@ -300,7 +268,7 @@ public class SwitchHandler implements ISwitch { * method should be used for critical messages such as hello, echo reply * etc. An unique XID is generated automatically and inserted into the * message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -315,7 +283,7 @@ public class SwitchHandler implements ISwitch { * priority. It will be served first before normal priority messages. The * method should be used for critical messages such as hello, echo reply * etc. The specified XID is inserted into the message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -339,46 +307,84 @@ public class SwitchHandler implements ISwitch { } } + /** + * This method bypasses the transmit queue and sends the message over the + * socket directly. If the input xid is not null, the specified xid is + * inserted into the message. Otherwise, an unique xid is generated + * automatically and inserted into the message. + * + * @param msg + * Message to be sent + * @param xid + * Message xid + */ + private void asyncSendNow(OFMessage msg, Integer xid) { + if (xid == null) { + xid = getNextXid(); + } + msg.setXid(xid); + + asyncSendNow(msg); + } + + /** + * This method bypasses the transmit queue and sends the message over the + * socket directly. + * + * @param msg + * Message to be sent + */ + private void asyncSendNow(OFMessage msg) { + if (msgReadWriteService == null) { + logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg); + return; + } + + try { + msgReadWriteService.asyncSend(msg); + } catch (Exception e) { + reportError(e); + } + } + public void handleMessages() { List msgs = null; try { - msgs = msgReadWriteService.readMessages(); + if (msgReadWriteService != null) { + msgs = msgReadWriteService.readMessages(); + } } catch (Exception e) { reportError(e); } if (msgs == null) { - logger.debug("{} is down", toString()); - // the connection is down, inform core + logger.info("{} is down", this); reportSwitchStateChange(false); return; } for (OFMessage msg : msgs) { - logger.trace("Message received: {}", msg.toString()); + logger.trace("Message received: {}", msg); this.lastMsgReceivedTimeStamp = System.currentTimeMillis(); OFType type = msg.getType(); switch (type) { case HELLO: // send feature request - OFMessage featureRequest = factory - .getMessage(OFType.FEATURES_REQUEST); + OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST); asyncFastSend(featureRequest); // delete all pre-existing flows OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL); - OFFlowMod flowMod = (OFFlowMod) factory - .getMessage(OFType.FLOW_MOD); - flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE) - .setOutPort(OFPort.OFPP_NONE) + OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD); + flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE) .setLength((short) OFFlowMod.MINIMUM_LENGTH); asyncFastSend(flowMod); this.state = SwitchState.WAIT_FEATURES_REPLY; startSwitchTimer(); break; case ECHO_REQUEST: - OFEchoReply echoReply = (OFEchoReply) factory - .getMessage(OFType.ECHO_REPLY); - asyncFastSend(echoReply); + OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY); + // respond immediately + asyncSendNow(echoReply, msg.getXid()); break; case ECHO_REPLY: this.probeSent = false; @@ -422,8 +428,7 @@ public class SwitchHandler implements ISwitch { updatePhysicalPort(port); } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) { updatePhysicalPort(port); - } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE - .ordinal()) { + } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) { deletePhysicalPort(port); } @@ -440,35 +445,29 @@ public class SwitchHandler implements ISwitch { if (probeSent) { // switch failed to respond to our probe, consider // it down - logger.warn("{} is idle for too long, disconnect", - toString()); + logger.warn("{} sid {} is idle for too long, disconnect", socket.socket() + .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown" + : HexString.toHexString(sid)); reportSwitchStateChange(false); } else { // send a probe to see if the switch is still alive - logger.debug( - "Send idle probe (Echo Request) to {}", - toString()); + logger.debug("Send idle probe (Echo Request) to {}", this); probeSent = true; - OFMessage echo = factory - .getMessage(OFType.ECHO_REQUEST); + OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST); asyncFastSend(echo); } } else { if (state == SwitchState.WAIT_FEATURES_REPLY) { // send another features request - OFMessage request = factory - .getMessage(OFType.FEATURES_REQUEST); + OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST); asyncFastSend(request); } else { if (state == SwitchState.WAIT_CONFIG_REPLY) { // send another config request - OFSetConfig config = (OFSetConfig) factory - .getMessage(OFType.SET_CONFIG); - config.setMissSendLength((short) 0xffff) - .setLengthU(OFSetConfig.MINIMUM_LENGTH); + OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG); + config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH); asyncFastSend(config); - OFMessage getConfig = factory - .getMessage(OFType.GET_CONFIG_REQUEST); + OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST); asyncFastSend(getConfig); } } @@ -487,15 +486,18 @@ public class SwitchHandler implements ISwitch { } private void reportError(Exception e) { - if (e instanceof AsynchronousCloseException - || e instanceof InterruptedException - || e instanceof SocketException || e instanceof IOException) { - logger.debug("Caught exception {}", e.getMessage()); - } else { - logger.warn("Caught exception ", e); + if (!running) { + logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(), + (isOperational() ? HexString.toHexString(sid) : "unknown")); + return; } + logger.debug("Caught exception: ", e); + // notify core of this error event and disconnect the switch ((Controller) core).takeSwitchEventError(this); + + // clean up some internal states immediately + stopInternal(); } private void reportSwitchStateChange(boolean added) { @@ -523,10 +525,8 @@ public class SwitchHandler implements ISwitch { updatePhysicalPort(port); } // config the switch to send full data packet - OFSetConfig config = (OFSetConfig) factory - .getMessage(OFType.SET_CONFIG); - config.setMissSendLength((short) 0xffff).setLengthU( - OFSetConfig.MINIMUM_LENGTH); + OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG); + config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH); asyncFastSend(config); // send config request to make sure the switch can handle the set // config @@ -544,17 +544,11 @@ public class SwitchHandler implements ISwitch { portBandwidth .put(portNumber, port.getCurrentFeatures() - & (OFPortFeatures.OFPPF_10MB_FD.getValue() - | OFPortFeatures.OFPPF_10MB_HD - .getValue() - | OFPortFeatures.OFPPF_100MB_FD - .getValue() - | OFPortFeatures.OFPPF_100MB_HD - .getValue() - | OFPortFeatures.OFPPF_1GB_FD - .getValue() - | OFPortFeatures.OFPPF_1GB_HD - .getValue() | OFPortFeatures.OFPPF_10GB_FD + & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue() + | OFPortFeatures.OFPPF_100MB_FD.getValue() + | OFPortFeatures.OFPPF_100MB_HD.getValue() + | OFPortFeatures.OFPPF_1GB_FD.getValue() + | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD .getValue())); } @@ -572,13 +566,10 @@ public class SwitchHandler implements ISwitch { @Override public String toString() { try { - return ("Switch:" - + socket.socket().getRemoteSocketAddress().toString().split("/")[1] - + " SWID:" + (isOperational() ? HexString + return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString .toHexString(this.sid) : "unknown")); } catch (Exception e) { - return (isOperational() ? HexString.toHexString(this.sid) - : "unknown"); + return (isOperational() ? HexString.toHexString(this.sid) : "unknown"); } } @@ -597,20 +588,32 @@ public class SwitchHandler implements ISwitch { int xid = getNextXid(); StatisticsCollector worker = new StatisticsCollector(this, xid, req); messageWaitingDone.put(xid, worker); - Future submit = executor.submit(worker); + Future submit; Object result = null; + try { + submit = executor.submit(worker); + } catch (RejectedExecutionException re) { + messageWaitingDone.remove(xid); + return result; + } try { result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); return result; } catch (Exception e) { - logger.warn("Timeout while waiting for {} replies", req.getType()); + logger.warn("Timeout while waiting for {} replies from {}", + req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown")); result = null; // to indicate timeout has occurred + worker.wakeup(); return result; } } @Override public Object syncSend(OFMessage msg) { + if (!running) { + logger.debug("Switch is going down, ignore syncSend"); + return null; + } int xid = getNextXid(); return syncSend(msg, xid); } @@ -621,8 +624,7 @@ public class SwitchHandler implements ISwitch { */ private void processBarrierReply(OFBarrierReply msg) { Integer xid = msg.getXid(); - SynchronousMessage worker = (SynchronousMessage) messageWaitingDone - .remove(xid); + SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid); if (worker == null) { return; } @@ -654,8 +656,7 @@ public class SwitchHandler implements ISwitch { private void processStatsReply(OFStatisticsReply reply) { Integer xid = reply.getXid(); - StatisticsCollector worker = (StatisticsCollector) messageWaitingDone - .get(xid); + StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid); if (worker == null) { return; } @@ -725,8 +726,7 @@ public class SwitchHandler implements ISwitch { if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) { return false; } - if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK - .getValue()) { + if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) { return false; } return true; @@ -750,19 +750,21 @@ public class SwitchHandler implements ISwitch { * messaging service to transmit it over the socket channel */ class PriorityMessageTransmit implements Runnable { + @Override public void run() { running = true; while (running) { try { - if (!transmitQ.isEmpty()) { - PriorityMessage pmsg = transmitQ.poll(); - msgReadWriteService.asyncSend(pmsg.msg); - logger.trace("Message sent: {}", pmsg.toString()); + PriorityMessage pmsg = transmitQ.take(); + msgReadWriteService.asyncSend(pmsg.msg); + /* + * If syncReply is set to true, wait for the response back. + */ + if (pmsg.syncReply) { + syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); } - Thread.sleep(10); } catch (InterruptedException ie) { - reportError(new InterruptedException( - "PriorityMessageTransmit thread interrupted")); + reportError(new InterruptedException("PriorityMessageTransmit thread interrupted")); } catch (Exception e) { reportError(e); } @@ -775,16 +777,16 @@ public class SwitchHandler implements ISwitch { * Setup and start the transmit thread */ private void startTransmitThread() { - this.transmitQ = new PriorityBlockingQueue(11, - new Comparator() { - public int compare(PriorityMessage p1, PriorityMessage p2) { - if (p2.priority != p1.priority) { - return p2.priority - p1.priority; - } else { - return (p2.seqNum < p1.seqNum) ? 1 : -1; - } - } - }); + this.transmitQ = new PriorityBlockingQueue(11, new Comparator() { + @Override + public int compare(PriorityMessage p1, PriorityMessage p2) { + if (p2.priority != p1.priority) { + return p2.priority - p1.priority; + } else { + return (p2.seqNum < p1.seqNum) ? 1 : -1; + } + } + }); this.transmitThread = new Thread(new PriorityMessageTransmit()); this.transmitThread.start(); } @@ -810,25 +812,45 @@ public class SwitchHandler implements ISwitch { private IMessageReadWrite getMessageReadWriteService() throws Exception { String str = System.getProperty("secureChannelEnabled"); - return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService( - socket, selector) : new MessageReadWriteService(socket, - selector); + return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket, + selector) : new MessageReadWriteService(socket, selector); } /** - * Sends synchronous Barrier message + * Send Barrier message synchronously. The caller will be blocked until the + * Barrier reply is received. */ @Override - public Object sendBarrierMessage() { + public Object syncSendBarrierMessage() { OFBarrierRequest barrierMsg = new OFBarrierRequest(); return syncSend(barrierMsg); } + /** + * Send Barrier message asynchronously. The caller is not blocked. The + * Barrier message will be sent in a transmit thread which will be blocked + * until the Barrier reply is received. + */ + @Override + public Object asyncSendBarrierMessage() { + if (transmitQ == null) { + return Boolean.FALSE; + } + + OFBarrierRequest barrierMsg = new OFBarrierRequest(); + int xid = getNextXid(); + + barrierMsg.setXid(xid); + transmitQ.add(new PriorityMessage(barrierMsg, 0, true)); + + return Boolean.TRUE; + } + /** * This method returns the switch liveness timeout value. If controller did * not receive any message from the switch for such a long period, * controller will tear down the connection to the switch. - * + * * @return The timeout value */ private static int getSwitchLivenessTimeout() { @@ -844,4 +866,63 @@ public class SwitchHandler implements ISwitch { return rv; } + + /** + * This method performs synchronous operations for a given message. If + * syncRequest is set to true, the message will be sent out followed by a + * Barrier request message. Then it's blocked until the Barrier rely arrives + * or timeout. If syncRequest is false, it simply skips the message send and + * just waits for the response back. + * + * @param msg + * Message to be sent + * @param xid + * Message XID + * @param request + * If set to true, the message the message will be sent out + * followed by a Barrier request message. If set to false, it + * simply skips the sending and just waits for the Barrier reply. + * @return the result + */ + private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) { + SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest); + messageWaitingDone.put(xid, worker); + Object result = null; + Boolean status = false; + Future submit; + try { + submit = executor.submit(worker); + } catch (RejectedExecutionException re) { + messageWaitingDone.remove(xid); + return result; + } + try { + result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); + messageWaitingDone.remove(xid); + if (result == null) { + // if result is null, then it means the switch can handle this + // message successfully + // convert the result into a Boolean with value true + status = true; + // logger.debug("Successfully send " + + // msg.getType().toString()); + result = status; + } else { + // if result is not null, this means the switch can't handle + // this message + // the result if OFError already + if (logger.isDebugEnabled()) { + logger.debug("Send {} failed --> {}", msg.getType(), (result)); + } + } + return result; + } catch (Exception e) { + logger.warn("Timeout while waiting for {} reply", msg.getType().toString()); + // convert the result into a Boolean with value false + status = false; + result = status; + worker.wakeup(); + return result; + } + } }