1. Controller switchEvents queue should be priority based. The queue holds switch...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
index 5d51d26a988f553f0e79a3d2a29b408ddb486871..91909d20f53a5bde5adea2b7020c38a78406f080 100644 (file)
@@ -8,10 +8,6 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 
-import java.io.IOException;
-import java.net.SocketException;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
@@ -32,12 +28,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
-import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.openflow.protocol.OFBarrierReply;
 import org.openflow.protocol.OFBarrierRequest;
 import org.openflow.protocol.OFEchoReply;
@@ -64,33 +61,32 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SwitchHandler implements ISwitch {
-    private static final Logger logger = LoggerFactory
-            .getLogger(SwitchHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
     private static final int SWITCH_LIVENESS_TIMER = 5000;
     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
-    private int MESSAGE_RESPONSE_TIMER = 2000;
+    private final int MESSAGE_RESPONSE_TIMER = 2000;
 
-    private String instanceName;
-    private ISwitch thisISwitch;
-    private IController core;
+    private final String instanceName;
+    private final ISwitch thisISwitch;
+    private final IController core;
     private Long sid;
     private Integer buffers;
     private Integer capabilities;
     private Byte tables;
     private Integer actions;
     private Selector selector;
-    private SocketChannel socket;
-    private BasicFactory factory;
-    private AtomicInteger xid;
+    private final SocketChannel socket;
+    private final BasicFactory factory;
+    private final AtomicInteger xid;
     private SwitchState state;
     private Timer periodicTimer;
-    private Map<Short, OFPhysicalPort> physicalPorts;
-    private Map<Short, Integer> portBandwidth;
-    private Date connectedDate;
+    private final Map<Short, OFPhysicalPort> physicalPorts;
+    private final Map<Short, Integer> portBandwidth;
+    private final Date connectedDate;
     private Long lastMsgReceivedTimeStamp;
     private Boolean probeSent;
-    private ExecutorService executor;
-    private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
+    private final ExecutorService executor;
+    private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
     private boolean running;
     private IMessageReadWrite msgReadWriteService;
     private Thread switchHandlerThread;
@@ -99,8 +95,7 @@ public class SwitchHandler implements ISwitch {
     private Thread transmitThread;
 
     private enum SwitchState {
-        NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
-                3);
+        NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
 
         private int value;
 
@@ -141,9 +136,7 @@ public class SwitchHandler implements ISwitch {
             try {
                 responseTimerValue = Integer.decode(rTimer);
             } catch (NumberFormatException e) {
-                logger.warn(
-                        "Invalid of.messageResponseTimer: {} use default({})",
-                        rTimer, MESSAGE_RESPONSE_TIMER);
+                logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
             }
         }
     }
@@ -168,8 +161,7 @@ public class SwitchHandler implements ISwitch {
                     try {
                         // wait for an incoming connection
                         selector.select(0);
-                        Iterator<SelectionKey> selectedKeys = selector
-                                .selectedKeys().iterator();
+                        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
                         while (selectedKeys.hasNext()) {
                             SelectionKey skey = selectedKeys.next();
                             selectedKeys.remove();
@@ -189,7 +181,9 @@ public class SwitchHandler implements ISwitch {
         switchHandlerThread.start();
     }
 
-    public void stop() {
+    private void stopInternal() {
+        logger.debug("{} receives stop signal",
+                (isOperational() ? HexString.toHexString(sid) : "unknown"));
         running = false;
         cancelSwitchTimer();
         try {
@@ -205,9 +199,14 @@ public class SwitchHandler implements ISwitch {
             msgReadWriteService.stop();
         } catch (Exception e) {
         }
-        executor.shutdown();
+        logger.debug("executor shutdown now");
+        executor.shutdownNow();
 
         msgReadWriteService = null;
+    }
+
+    public void stop() {
+        stopInternal();
 
         if (switchHandlerThread != null) {
             switchHandlerThread.interrupt();
@@ -337,9 +336,7 @@ public class SwitchHandler implements ISwitch {
      */
     private void asyncSendNow(OFMessage msg) {
         if (msgReadWriteService == null) {
-            logger.warn(
-                    "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
-                    msg);
+            logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
             return;
         }
 
@@ -362,8 +359,7 @@ public class SwitchHandler implements ISwitch {
         }
 
         if (msgs == null) {
-            logger.debug("{} is down", this);
-            // the connection is down, inform core
+            logger.info("{} is down", this);
             reportSwitchStateChange(false);
             return;
         }
@@ -374,23 +370,19 @@ public class SwitchHandler implements ISwitch {
             switch (type) {
             case HELLO:
                 // send feature request
-                OFMessage featureRequest = factory
-                        .getMessage(OFType.FEATURES_REQUEST);
+                OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
                 asyncFastSend(featureRequest);
                 // delete all pre-existing flows
                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
-                OFFlowMod flowMod = (OFFlowMod) factory
-                        .getMessage(OFType.FLOW_MOD);
-                flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
-                        .setOutPort(OFPort.OFPP_NONE)
+                OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+                flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
                 asyncFastSend(flowMod);
                 this.state = SwitchState.WAIT_FEATURES_REPLY;
                 startSwitchTimer();
                 break;
             case ECHO_REQUEST:
-                OFEchoReply echoReply = (OFEchoReply) factory
-                        .getMessage(OFType.ECHO_REPLY);
+                OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
                 // respond immediately
                 asyncSendNow(echoReply, msg.getXid());
                 break;
@@ -436,8 +428,7 @@ public class SwitchHandler implements ISwitch {
             updatePhysicalPort(port);
         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
             updatePhysicalPort(port);
-        } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
-                .ordinal()) {
+        } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
             deletePhysicalPort(port);
         }
 
@@ -454,35 +445,29 @@ public class SwitchHandler implements ISwitch {
                         if (probeSent) {
                             // switch failed to respond to our probe, consider
                             // it down
-                            logger.warn("{} is idle for too long, disconnect",
-                                    toString());
+                            logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
+                                    .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
+                                    : HexString.toHexString(sid));
                             reportSwitchStateChange(false);
                         } else {
                             // send a probe to see if the switch is still alive
-                            logger.debug(
-                                    "Send idle probe (Echo Request) to {}",
-                                    this);
+                            logger.debug("Send idle probe (Echo Request) to {}", this);
                             probeSent = true;
-                            OFMessage echo = factory
-                                    .getMessage(OFType.ECHO_REQUEST);
+                            OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
                             asyncFastSend(echo);
                         }
                     } else {
                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
                             // send another features request
-                            OFMessage request = factory
-                                    .getMessage(OFType.FEATURES_REQUEST);
+                            OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
                             asyncFastSend(request);
                         } else {
                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
                                 // send another config request
-                                OFSetConfig config = (OFSetConfig) factory
-                                        .getMessage(OFType.SET_CONFIG);
-                                config.setMissSendLength((short) 0xffff)
-                                        .setLengthU(OFSetConfig.MINIMUM_LENGTH);
+                                OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+                                config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
                                 asyncFastSend(config);
-                                OFMessage getConfig = factory
-                                        .getMessage(OFType.GET_CONFIG_REQUEST);
+                                OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
                                 asyncFastSend(getConfig);
                             }
                         }
@@ -501,18 +486,18 @@ public class SwitchHandler implements ISwitch {
     }
 
     private void reportError(Exception e) {
-        if (e instanceof AsynchronousCloseException
-                || e instanceof InterruptedException
-                || e instanceof SocketException || e instanceof IOException
-                || e instanceof ClosedSelectorException) {
-            if (logger.isDebugEnabled()) {
-              logger.debug("Caught exception {}", e.getMessage());
-            }
-        } else {
-            logger.warn("Caught exception ", e);
+        if (!running) {
+            logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
+                    (isOperational() ? HexString.toHexString(sid) : "unknown"));
+            return;
         }
+        logger.debug("Caught exception: ", e);
+
         // notify core of this error event and disconnect the switch
         ((Controller) core).takeSwitchEventError(this);
+
+        // clean up some internal states immediately
+        stopInternal();
     }
 
     private void reportSwitchStateChange(boolean added) {
@@ -540,10 +525,8 @@ public class SwitchHandler implements ISwitch {
                 updatePhysicalPort(port);
             }
             // config the switch to send full data packet
-            OFSetConfig config = (OFSetConfig) factory
-                    .getMessage(OFType.SET_CONFIG);
-            config.setMissSendLength((short) 0xffff).setLengthU(
-                    OFSetConfig.MINIMUM_LENGTH);
+            OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+            config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
             asyncFastSend(config);
             // send config request to make sure the switch can handle the set
             // config
@@ -561,17 +544,11 @@ public class SwitchHandler implements ISwitch {
         portBandwidth
                 .put(portNumber,
                         port.getCurrentFeatures()
-                                & (OFPortFeatures.OFPPF_10MB_FD.getValue()
-                                        | OFPortFeatures.OFPPF_10MB_HD
-                                                .getValue()
-                                        | OFPortFeatures.OFPPF_100MB_FD
-                                                .getValue()
-                                        | OFPortFeatures.OFPPF_100MB_HD
-                                                .getValue()
-                                        | OFPortFeatures.OFPPF_1GB_FD
-                                                .getValue()
-                                        | OFPortFeatures.OFPPF_1GB_HD
-                                                .getValue() | OFPortFeatures.OFPPF_10GB_FD
+                                & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
+                                        | OFPortFeatures.OFPPF_100MB_FD.getValue()
+                                        | OFPortFeatures.OFPPF_100MB_HD.getValue()
+                                        | OFPortFeatures.OFPPF_1GB_FD.getValue()
+                                        | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
                                             .getValue()));
     }
 
@@ -589,13 +566,10 @@ public class SwitchHandler implements ISwitch {
     @Override
     public String toString() {
         try {
-            return ("Switch:"
-                    + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
-                    + " SWID:" + (isOperational() ? HexString
+            return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
                     .toHexString(this.sid) : "unknown"));
         } catch (Exception e) {
-            return (isOperational() ? HexString.toHexString(this.sid)
-                    : "unknown");
+            return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
         }
 
     }
@@ -614,20 +588,32 @@ public class SwitchHandler implements ISwitch {
         int xid = getNextXid();
         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
         messageWaitingDone.put(xid, worker);
-        Future<Object> submit = executor.submit(worker);
+        Future<Object> submit;
         Object result = null;
+        try {
+            submit = executor.submit(worker);
+        } catch (RejectedExecutionException re) {
+            messageWaitingDone.remove(xid);
+            return result;
+        }
         try {
             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for {} replies", req.getType());
+            logger.warn("Timeout while waiting for {} replies from {}",
+                    req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
             result = null; // to indicate timeout has occurred
+            worker.wakeup();
             return result;
         }
     }
 
     @Override
     public Object syncSend(OFMessage msg) {
+        if (!running) {
+            logger.debug("Switch is going down, ignore syncSend");
+            return null;
+        }
         int xid = getNextXid();
         return syncSend(msg, xid);
     }
@@ -638,8 +624,7 @@ public class SwitchHandler implements ISwitch {
      */
     private void processBarrierReply(OFBarrierReply msg) {
         Integer xid = msg.getXid();
-        SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
-                .remove(xid);
+        SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
         if (worker == null) {
             return;
         }
@@ -671,8 +656,7 @@ public class SwitchHandler implements ISwitch {
 
     private void processStatsReply(OFStatisticsReply reply) {
         Integer xid = reply.getXid();
-        StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
-                .get(xid);
+        StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
         if (worker == null) {
             return;
         }
@@ -742,8 +726,7 @@ public class SwitchHandler implements ISwitch {
         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
             return false;
         }
-        if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
-                .getValue()) {
+        if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
             return false;
         }
         return true;
@@ -767,26 +750,21 @@ public class SwitchHandler implements ISwitch {
      * messaging service to transmit it over the socket channel
      */
     class PriorityMessageTransmit implements Runnable {
+        @Override
         public void run() {
             running = true;
             while (running) {
                 try {
-                    if (!transmitQ.isEmpty()) {
-                        PriorityMessage pmsg = transmitQ.poll();
-                        msgReadWriteService.asyncSend(pmsg.msg);
-                        logger.trace("Message sent: {}", pmsg);
-                        /*
-                         * If syncReply is set to true, wait for the response
-                         * back.
-                         */
-                        if (pmsg.syncReply) {
-                            syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
-                        }
+                    PriorityMessage pmsg = transmitQ.take();
+                    msgReadWriteService.asyncSend(pmsg.msg);
+                    /*
+                     * If syncReply is set to true, wait for the response back.
+                     */
+                    if (pmsg.syncReply) {
+                        syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
                     }
-                    Thread.sleep(10);
                 } catch (InterruptedException ie) {
-                    reportError(new InterruptedException(
-                            "PriorityMessageTransmit thread interrupted"));
+                    reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
                 } catch (Exception e) {
                     reportError(e);
                 }
@@ -799,16 +777,16 @@ public class SwitchHandler implements ISwitch {
      * Setup and start the transmit thread
      */
     private void startTransmitThread() {
-        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
-                new Comparator<PriorityMessage>() {
-                    public int compare(PriorityMessage p1, PriorityMessage p2) {
-                        if (p2.priority != p1.priority) {
-                            return p2.priority - p1.priority;
-                        } else {
-                            return (p2.seqNum < p1.seqNum) ? 1 : -1;
-                        }
-                    }
-                });
+        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
+            @Override
+            public int compare(PriorityMessage p1, PriorityMessage p2) {
+                if (p2.priority != p1.priority) {
+                    return p2.priority - p1.priority;
+                } else {
+                    return (p2.seqNum < p1.seqNum) ? 1 : -1;
+                }
+            }
+        });
         this.transmitThread = new Thread(new PriorityMessageTransmit());
         this.transmitThread.start();
     }
@@ -834,9 +812,8 @@ public class SwitchHandler implements ISwitch {
 
     private IMessageReadWrite getMessageReadWriteService() throws Exception {
         String str = System.getProperty("secureChannelEnabled");
-        return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
-                socket, selector) : new MessageReadWriteService(socket,
-                selector);
+        return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
+                selector) : new MessageReadWriteService(socket, selector);
     }
 
     /**
@@ -912,7 +889,13 @@ public class SwitchHandler implements ISwitch {
         messageWaitingDone.put(xid, worker);
         Object result = null;
         Boolean status = false;
-        Future<Object> submit = executor.submit(worker);
+        Future<Object> submit;
+        try {
+           submit = executor.submit(worker);
+        } catch (RejectedExecutionException re) {
+            messageWaitingDone.remove(xid);
+            return result;
+        }
         try {
             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
             messageWaitingDone.remove(xid);
@@ -929,17 +912,16 @@ public class SwitchHandler implements ISwitch {
                 // this message
                 // the result if OFError already
                 if (logger.isDebugEnabled()) {
-                  logger.debug("Send {} failed --> {}", msg.getType(),
-                               ((OFError) result));
+                    logger.debug("Send {} failed --> {}", msg.getType(), (result));
                 }
             }
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for {} reply", msg.getType()
-                    .toString());
+            logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
             // convert the result into a Boolean with value false
             status = false;
             result = status;
+            worker.wakeup();
             return result;
         }
     }