This is a fix for the latch to countdown in case of exceptions and
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
index 9a05c3f4f100c46b2c357f59ae6256f4e497f491..52ea7fd575a76ae4f2053c5b1c96c7a10e622c0e 100644 (file)
@@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
-import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.openflow.protocol.OFBarrierReply;
 import org.openflow.protocol.OFBarrierRequest;
 import org.openflow.protocol.OFEchoReply;
@@ -68,29 +68,29 @@ public class SwitchHandler implements ISwitch {
             .getLogger(SwitchHandler.class);
     private static final int SWITCH_LIVENESS_TIMER = 5000;
     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
-    private int MESSAGE_RESPONSE_TIMER = 2000;
+    private final int MESSAGE_RESPONSE_TIMER = 2000;
 
-    private String instanceName;
-    private ISwitch thisISwitch;
-    private IController core;
+    private final String instanceName;
+    private final ISwitch thisISwitch;
+    private final IController core;
     private Long sid;
     private Integer buffers;
     private Integer capabilities;
     private Byte tables;
     private Integer actions;
     private Selector selector;
-    private SocketChannel socket;
-    private BasicFactory factory;
-    private AtomicInteger xid;
+    private final SocketChannel socket;
+    private final BasicFactory factory;
+    private final AtomicInteger xid;
     private SwitchState state;
     private Timer periodicTimer;
-    private Map<Short, OFPhysicalPort> physicalPorts;
-    private Map<Short, Integer> portBandwidth;
-    private Date connectedDate;
+    private final Map<Short, OFPhysicalPort> physicalPorts;
+    private final Map<Short, Integer> portBandwidth;
+    private final Date connectedDate;
     private Long lastMsgReceivedTimeStamp;
     private Boolean probeSent;
-    private ExecutorService executor;
-    private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
+    private final ExecutorService executor;
+    private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
     private boolean running;
     private IMessageReadWrite msgReadWriteService;
     private Thread switchHandlerThread;
@@ -228,7 +228,7 @@ public class SwitchHandler implements ISwitch {
      * should be used for non-critical messages such as statistics request,
      * discovery packets, etc. An unique XID is generated automatically and
      * inserted into the message.
-     * 
+     *
      * @param msg
      *            The OF message to be sent
      * @return The XID used
@@ -247,7 +247,7 @@ public class SwitchHandler implements ISwitch {
      * priority. It will be served after high priority messages. The method
      * should be used for non-critical messages such as statistics request,
      * discovery packets, etc. The specified XID is inserted into the message.
-     * 
+     *
      * @param msg
      *            The OF message to be Sent
      * @param xid
@@ -269,7 +269,7 @@ public class SwitchHandler implements ISwitch {
      * method should be used for critical messages such as hello, echo reply
      * etc. An unique XID is generated automatically and inserted into the
      * message.
-     * 
+     *
      * @param msg
      *            The OF message to be sent
      * @return The XID used
@@ -284,7 +284,7 @@ public class SwitchHandler implements ISwitch {
      * priority. It will be served first before normal priority messages. The
      * method should be used for critical messages such as hello, echo reply
      * etc. The specified XID is inserted into the message.
-     * 
+     *
      * @param msg
      *            The OF message to be sent
      * @return The XID used
@@ -308,6 +308,48 @@ public class SwitchHandler implements ISwitch {
         }
     }
 
+    /**
+     * This method bypasses the transmit queue and sends the message over the
+     * socket directly. If the input xid is not null, the specified xid is
+     * inserted into the message. Otherwise, an unique xid is generated
+     * automatically and inserted into the message.
+     *
+     * @param msg
+     *            Message to be sent
+     * @param xid
+     *            Message xid
+     */
+    private void asyncSendNow(OFMessage msg, Integer xid) {
+        if (xid == null) {
+            xid = getNextXid();
+        }
+        msg.setXid(xid);
+
+        asyncSendNow(msg);
+    }
+
+    /**
+     * This method bypasses the transmit queue and sends the message over the
+     * socket directly.
+     *
+     * @param msg
+     *            Message to be sent
+     */
+    private void asyncSendNow(OFMessage msg) {
+        if (msgReadWriteService == null) {
+            logger.warn(
+                    "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
+                    msg);
+            return;
+        }
+
+        try {
+            msgReadWriteService.asyncSend(msg);
+        } catch (Exception e) {
+            reportError(e);
+        }
+    }
+
     public void handleMessages() {
         List<OFMessage> msgs = null;
 
@@ -320,13 +362,13 @@ public class SwitchHandler implements ISwitch {
         }
 
         if (msgs == null) {
-            logger.debug("{} is down", toString());
+            logger.debug("{} is down", this);
             // the connection is down, inform core
             reportSwitchStateChange(false);
             return;
         }
         for (OFMessage msg : msgs) {
-            logger.trace("Message received: {}", msg.toString());
+            logger.trace("Message received: {}", msg);
             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
             OFType type = msg.getType();
             switch (type) {
@@ -349,7 +391,8 @@ public class SwitchHandler implements ISwitch {
             case ECHO_REQUEST:
                 OFEchoReply echoReply = (OFEchoReply) factory
                         .getMessage(OFType.ECHO_REPLY);
-                asyncFastSend(echoReply);
+                // respond immediately
+                asyncSendNow(echoReply, msg.getXid());
                 break;
             case ECHO_REPLY:
                 this.probeSent = false;
@@ -418,7 +461,7 @@ public class SwitchHandler implements ISwitch {
                             // send a probe to see if the switch is still alive
                             logger.debug(
                                     "Send idle probe (Echo Request) to {}",
-                                    toString());
+                                    this);
                             probeSent = true;
                             OFMessage echo = factory
                                     .getMessage(OFType.ECHO_REQUEST);
@@ -462,7 +505,9 @@ public class SwitchHandler implements ISwitch {
                 || e instanceof InterruptedException
                 || e instanceof SocketException || e instanceof IOException
                 || e instanceof ClosedSelectorException) {
-            logger.debug("Caught exception {}", e.getMessage());
+            if (logger.isDebugEnabled()) {
+              logger.debug("Caught exception {}", e.getMessage());
+            }
         } else {
             logger.warn("Caught exception ", e);
         }
@@ -577,6 +622,7 @@ public class SwitchHandler implements ISwitch {
         } catch (Exception e) {
             logger.warn("Timeout while waiting for {} replies", req.getType());
             result = null; // to indicate timeout has occurred
+            worker.wakeup();
             return result;
         }
     }
@@ -722,23 +768,19 @@ public class SwitchHandler implements ISwitch {
      * messaging service to transmit it over the socket channel
      */
     class PriorityMessageTransmit implements Runnable {
+        @Override
         public void run() {
             running = true;
             while (running) {
                 try {
-                    if (!transmitQ.isEmpty()) {
-                        PriorityMessage pmsg = transmitQ.poll();
-                        msgReadWriteService.asyncSend(pmsg.msg);
-                        logger.trace("Message sent: {}", pmsg.toString());
-                        /*
-                         * If syncReply is set to true, wait for the response
-                         * back.
-                         */
-                        if (pmsg.syncReply) {
-                            syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
-                        }
+                    PriorityMessage pmsg = transmitQ.take();
+                    msgReadWriteService.asyncSend(pmsg.msg);
+                    /*
+                     * If syncReply is set to true, wait for the response back.
+                     */
+                    if (pmsg.syncReply) {
+                        syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
                     }
-                    Thread.sleep(10);
                 } catch (InterruptedException ie) {
                     reportError(new InterruptedException(
                             "PriorityMessageTransmit thread interrupted"));
@@ -756,6 +798,7 @@ public class SwitchHandler implements ISwitch {
     private void startTransmitThread() {
         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
                 new Comparator<PriorityMessage>() {
+                    @Override
                     public int compare(PriorityMessage p1, PriorityMessage p2) {
                         if (p2.priority != p1.priority) {
                             return p2.priority - p1.priority;
@@ -820,7 +863,7 @@ public class SwitchHandler implements ISwitch {
 
         barrierMsg.setXid(xid);
         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
-        
+
         return Boolean.TRUE;
     }
 
@@ -828,7 +871,7 @@ public class SwitchHandler implements ISwitch {
      * This method returns the switch liveness timeout value. If controller did
      * not receive any message from the switch for such a long period,
      * controller will tear down the connection to the switch.
-     * 
+     *
      * @return The timeout value
      */
     private static int getSwitchLivenessTimeout() {
@@ -851,7 +894,7 @@ public class SwitchHandler implements ISwitch {
      * Barrier request message. Then it's blocked until the Barrier rely arrives
      * or timeout. If syncRequest is false, it simply skips the message send and
      * just waits for the response back.
-     * 
+     *
      * @param msg
      *            Message to be sent
      * @param xid
@@ -883,8 +926,10 @@ public class SwitchHandler implements ISwitch {
                 // if result is not null, this means the switch can't handle
                 // this message
                 // the result if OFError already
-                logger.debug("Send {} failed --> {}", msg.getType().toString(),
-                        ((OFError) result).toString());
+                if (logger.isDebugEnabled()) {
+                  logger.debug("Send {} failed --> {}", msg.getType(),
+                               (result));
+                }
             }
             return result;
         } catch (Exception e) {
@@ -893,6 +938,7 @@ public class SwitchHandler implements ISwitch {
             // convert the result into a Boolean with value false
             status = false;
             result = status;
+            worker.wakeup();
             return result;
         }
     }