X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FSwitchHandler.java;h=91909d20f53a5bde5adea2b7020c38a78406f080;hb=78ef04c45c5a7fbee9bbb9ae77ecb1882add8623;hp=81729c3dec029b39502562fdb0dc2c0cfd35ce8f;hpb=a9e6627736e99183c5c6be4dd42ec364836acb80;p=controller.git diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 81729c3dec..91909d20f5 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -8,10 +8,6 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; -import java.io.IOException; -import java.net.SocketException; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -32,12 +28,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.protocol_plugin.openflow.core.IController; -import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; +import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.openflow.protocol.OFBarrierReply; import org.openflow.protocol.OFBarrierRequest; import org.openflow.protocol.OFEchoReply; @@ -64,33 +61,32 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SwitchHandler implements ISwitch { - private static final Logger logger = LoggerFactory - .getLogger(SwitchHandler.class); + private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class); private static final int SWITCH_LIVENESS_TIMER = 5000; private static final int switchLivenessTimeout = getSwitchLivenessTimeout(); - private int MESSAGE_RESPONSE_TIMER = 2000; + private final int MESSAGE_RESPONSE_TIMER = 2000; - private String instanceName; - private ISwitch thisISwitch; - private IController core; + private final String instanceName; + private final ISwitch thisISwitch; + private final IController core; private Long sid; private Integer buffers; private Integer capabilities; private Byte tables; private Integer actions; private Selector selector; - private SocketChannel socket; - private BasicFactory factory; - private AtomicInteger xid; + private final SocketChannel socket; + private final BasicFactory factory; + private final AtomicInteger xid; private SwitchState state; private Timer periodicTimer; - private Map physicalPorts; - private Map portBandwidth; - private Date connectedDate; + private final Map physicalPorts; + private final Map portBandwidth; + private final Date connectedDate; private Long lastMsgReceivedTimeStamp; private Boolean probeSent; - private ExecutorService executor; - private ConcurrentHashMap> messageWaitingDone; + private final ExecutorService executor; + private final ConcurrentHashMap> messageWaitingDone; private boolean running; private IMessageReadWrite msgReadWriteService; private Thread switchHandlerThread; @@ -99,8 +95,7 @@ public class SwitchHandler implements ISwitch { private Thread transmitThread; private enum SwitchState { - NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL( - 3); + NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3); private int value; @@ -141,9 +136,7 @@ public class SwitchHandler implements ISwitch { try { responseTimerValue = Integer.decode(rTimer); } catch (NumberFormatException e) { - logger.warn( - "Invalid of.messageResponseTimer: {} use default({})", - rTimer, MESSAGE_RESPONSE_TIMER); + logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER); } } } @@ -168,8 +161,7 @@ public class SwitchHandler implements ISwitch { try { // wait for an incoming connection selector.select(0); - Iterator selectedKeys = selector - .selectedKeys().iterator(); + Iterator selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey skey = selectedKeys.next(); selectedKeys.remove(); @@ -189,7 +181,9 @@ public class SwitchHandler implements ISwitch { switchHandlerThread.start(); } - public void stop() { + private void stopInternal() { + logger.debug("{} receives stop signal", + (isOperational() ? HexString.toHexString(sid) : "unknown")); running = false; cancelSwitchTimer(); try { @@ -205,9 +199,14 @@ public class SwitchHandler implements ISwitch { msgReadWriteService.stop(); } catch (Exception e) { } - executor.shutdown(); + logger.debug("executor shutdown now"); + executor.shutdownNow(); msgReadWriteService = null; + } + + public void stop() { + stopInternal(); if (switchHandlerThread != null) { switchHandlerThread.interrupt(); @@ -337,9 +336,7 @@ public class SwitchHandler implements ISwitch { */ private void asyncSendNow(OFMessage msg) { if (msgReadWriteService == null) { - logger.warn( - "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", - msg); + logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg); return; } @@ -362,8 +359,7 @@ public class SwitchHandler implements ISwitch { } if (msgs == null) { - logger.debug("{} is down", this); - // the connection is down, inform core + logger.info("{} is down", this); reportSwitchStateChange(false); return; } @@ -374,23 +370,19 @@ public class SwitchHandler implements ISwitch { switch (type) { case HELLO: // send feature request - OFMessage featureRequest = factory - .getMessage(OFType.FEATURES_REQUEST); + OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST); asyncFastSend(featureRequest); // delete all pre-existing flows OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL); - OFFlowMod flowMod = (OFFlowMod) factory - .getMessage(OFType.FLOW_MOD); - flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE) - .setOutPort(OFPort.OFPP_NONE) + OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD); + flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE) .setLength((short) OFFlowMod.MINIMUM_LENGTH); asyncFastSend(flowMod); this.state = SwitchState.WAIT_FEATURES_REPLY; startSwitchTimer(); break; case ECHO_REQUEST: - OFEchoReply echoReply = (OFEchoReply) factory - .getMessage(OFType.ECHO_REPLY); + OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY); // respond immediately asyncSendNow(echoReply, msg.getXid()); break; @@ -436,8 +428,7 @@ public class SwitchHandler implements ISwitch { updatePhysicalPort(port); } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) { updatePhysicalPort(port); - } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE - .ordinal()) { + } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) { deletePhysicalPort(port); } @@ -454,35 +445,29 @@ public class SwitchHandler implements ISwitch { if (probeSent) { // switch failed to respond to our probe, consider // it down - logger.warn("{} is idle for too long, disconnect", - toString()); + logger.warn("{} sid {} is idle for too long, disconnect", socket.socket() + .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown" + : HexString.toHexString(sid)); reportSwitchStateChange(false); } else { // send a probe to see if the switch is still alive - logger.debug( - "Send idle probe (Echo Request) to {}", - this); + logger.debug("Send idle probe (Echo Request) to {}", this); probeSent = true; - OFMessage echo = factory - .getMessage(OFType.ECHO_REQUEST); + OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST); asyncFastSend(echo); } } else { if (state == SwitchState.WAIT_FEATURES_REPLY) { // send another features request - OFMessage request = factory - .getMessage(OFType.FEATURES_REQUEST); + OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST); asyncFastSend(request); } else { if (state == SwitchState.WAIT_CONFIG_REPLY) { // send another config request - OFSetConfig config = (OFSetConfig) factory - .getMessage(OFType.SET_CONFIG); - config.setMissSendLength((short) 0xffff) - .setLengthU(OFSetConfig.MINIMUM_LENGTH); + OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG); + config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH); asyncFastSend(config); - OFMessage getConfig = factory - .getMessage(OFType.GET_CONFIG_REQUEST); + OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST); asyncFastSend(getConfig); } } @@ -501,18 +486,18 @@ public class SwitchHandler implements ISwitch { } private void reportError(Exception e) { - if (e instanceof AsynchronousCloseException - || e instanceof InterruptedException - || e instanceof SocketException || e instanceof IOException - || e instanceof ClosedSelectorException) { - if (logger.isDebugEnabled()) { - logger.debug("Caught exception {}", e.getMessage()); - } - } else { - logger.warn("Caught exception ", e); + if (!running) { + logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(), + (isOperational() ? HexString.toHexString(sid) : "unknown")); + return; } + logger.debug("Caught exception: ", e); + // notify core of this error event and disconnect the switch ((Controller) core).takeSwitchEventError(this); + + // clean up some internal states immediately + stopInternal(); } private void reportSwitchStateChange(boolean added) { @@ -540,10 +525,8 @@ public class SwitchHandler implements ISwitch { updatePhysicalPort(port); } // config the switch to send full data packet - OFSetConfig config = (OFSetConfig) factory - .getMessage(OFType.SET_CONFIG); - config.setMissSendLength((short) 0xffff).setLengthU( - OFSetConfig.MINIMUM_LENGTH); + OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG); + config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH); asyncFastSend(config); // send config request to make sure the switch can handle the set // config @@ -561,17 +544,11 @@ public class SwitchHandler implements ISwitch { portBandwidth .put(portNumber, port.getCurrentFeatures() - & (OFPortFeatures.OFPPF_10MB_FD.getValue() - | OFPortFeatures.OFPPF_10MB_HD - .getValue() - | OFPortFeatures.OFPPF_100MB_FD - .getValue() - | OFPortFeatures.OFPPF_100MB_HD - .getValue() - | OFPortFeatures.OFPPF_1GB_FD - .getValue() - | OFPortFeatures.OFPPF_1GB_HD - .getValue() | OFPortFeatures.OFPPF_10GB_FD + & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue() + | OFPortFeatures.OFPPF_100MB_FD.getValue() + | OFPortFeatures.OFPPF_100MB_HD.getValue() + | OFPortFeatures.OFPPF_1GB_FD.getValue() + | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD .getValue())); } @@ -589,13 +566,10 @@ public class SwitchHandler implements ISwitch { @Override public String toString() { try { - return ("Switch:" - + socket.socket().getRemoteSocketAddress().toString().split("/")[1] - + " SWID:" + (isOperational() ? HexString + return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString .toHexString(this.sid) : "unknown")); } catch (Exception e) { - return (isOperational() ? HexString.toHexString(this.sid) - : "unknown"); + return (isOperational() ? HexString.toHexString(this.sid) : "unknown"); } } @@ -614,20 +588,32 @@ public class SwitchHandler implements ISwitch { int xid = getNextXid(); StatisticsCollector worker = new StatisticsCollector(this, xid, req); messageWaitingDone.put(xid, worker); - Future submit = executor.submit(worker); + Future submit; Object result = null; + try { + submit = executor.submit(worker); + } catch (RejectedExecutionException re) { + messageWaitingDone.remove(xid); + return result; + } try { result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); return result; } catch (Exception e) { - logger.warn("Timeout while waiting for {} replies", req.getType()); + logger.warn("Timeout while waiting for {} replies from {}", + req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown")); result = null; // to indicate timeout has occurred + worker.wakeup(); return result; } } @Override public Object syncSend(OFMessage msg) { + if (!running) { + logger.debug("Switch is going down, ignore syncSend"); + return null; + } int xid = getNextXid(); return syncSend(msg, xid); } @@ -638,8 +624,7 @@ public class SwitchHandler implements ISwitch { */ private void processBarrierReply(OFBarrierReply msg) { Integer xid = msg.getXid(); - SynchronousMessage worker = (SynchronousMessage) messageWaitingDone - .remove(xid); + SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid); if (worker == null) { return; } @@ -671,8 +656,7 @@ public class SwitchHandler implements ISwitch { private void processStatsReply(OFStatisticsReply reply) { Integer xid = reply.getXid(); - StatisticsCollector worker = (StatisticsCollector) messageWaitingDone - .get(xid); + StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid); if (worker == null) { return; } @@ -742,8 +726,7 @@ public class SwitchHandler implements ISwitch { if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) { return false; } - if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK - .getValue()) { + if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) { return false; } return true; @@ -767,26 +750,21 @@ public class SwitchHandler implements ISwitch { * messaging service to transmit it over the socket channel */ class PriorityMessageTransmit implements Runnable { + @Override public void run() { running = true; while (running) { try { - while (!transmitQ.isEmpty()) { - PriorityMessage pmsg = transmitQ.poll(); - msgReadWriteService.asyncSend(pmsg.msg); - logger.trace("Message sent: {}", pmsg); - /* - * If syncReply is set to true, wait for the response - * back. - */ - if (pmsg.syncReply) { - syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); - } + PriorityMessage pmsg = transmitQ.take(); + msgReadWriteService.asyncSend(pmsg.msg); + /* + * If syncReply is set to true, wait for the response back. + */ + if (pmsg.syncReply) { + syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); } - Thread.sleep(10); } catch (InterruptedException ie) { - reportError(new InterruptedException( - "PriorityMessageTransmit thread interrupted")); + reportError(new InterruptedException("PriorityMessageTransmit thread interrupted")); } catch (Exception e) { reportError(e); } @@ -799,16 +777,16 @@ public class SwitchHandler implements ISwitch { * Setup and start the transmit thread */ private void startTransmitThread() { - this.transmitQ = new PriorityBlockingQueue(11, - new Comparator() { - public int compare(PriorityMessage p1, PriorityMessage p2) { - if (p2.priority != p1.priority) { - return p2.priority - p1.priority; - } else { - return (p2.seqNum < p1.seqNum) ? 1 : -1; - } - } - }); + this.transmitQ = new PriorityBlockingQueue(11, new Comparator() { + @Override + public int compare(PriorityMessage p1, PriorityMessage p2) { + if (p2.priority != p1.priority) { + return p2.priority - p1.priority; + } else { + return (p2.seqNum < p1.seqNum) ? 1 : -1; + } + } + }); this.transmitThread = new Thread(new PriorityMessageTransmit()); this.transmitThread.start(); } @@ -834,9 +812,8 @@ public class SwitchHandler implements ISwitch { private IMessageReadWrite getMessageReadWriteService() throws Exception { String str = System.getProperty("secureChannelEnabled"); - return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService( - socket, selector) : new MessageReadWriteService(socket, - selector); + return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket, + selector) : new MessageReadWriteService(socket, selector); } /** @@ -912,7 +889,13 @@ public class SwitchHandler implements ISwitch { messageWaitingDone.put(xid, worker); Object result = null; Boolean status = false; - Future submit = executor.submit(worker); + Future submit; + try { + submit = executor.submit(worker); + } catch (RejectedExecutionException re) { + messageWaitingDone.remove(xid); + return result; + } try { result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); messageWaitingDone.remove(xid); @@ -929,17 +912,16 @@ public class SwitchHandler implements ISwitch { // this message // the result if OFError already if (logger.isDebugEnabled()) { - logger.debug("Send {} failed --> {}", msg.getType(), - ((OFError) result)); + logger.debug("Send {} failed --> {}", msg.getType(), (result)); } } return result; } catch (Exception e) { - logger.warn("Timeout while waiting for {} reply", msg.getType() - .toString()); + logger.warn("Timeout while waiting for {} reply", msg.getType().toString()); // convert the result into a Boolean with value false status = false; result = status; + worker.wakeup(); return result; } }