- Respond to switch Echo Request immediately. It bypasses transmit queue and sends...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
index 57098ae3c6fe2f61429d72a7f10aac9cab5d8de6..5d51d26a988f553f0e79a3d2a29b408ddb486871 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 import java.io.IOException;
 import java.net.SocketException;
 import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
@@ -206,7 +207,6 @@ public class SwitchHandler implements ISwitch {
         }
         executor.shutdown();
 
-        selector = null;
         msgReadWriteService = null;
 
         if (switchHandlerThread != null) {
@@ -228,7 +228,7 @@ public class SwitchHandler implements ISwitch {
      * should be used for non-critical messages such as statistics request,
      * discovery packets, etc. An unique XID is generated automatically and
      * inserted into the message.
-     * 
+     *
      * @param msg
      *            The OF message to be sent
      * @return The XID used
@@ -239,38 +239,7 @@ public class SwitchHandler implements ISwitch {
     }
 
     private Object syncSend(OFMessage msg, int xid) {
-        SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
-        messageWaitingDone.put(xid, worker);
-        Object result = null;
-        Boolean status = false;
-        Future<Object> submit = executor.submit(worker);
-        try {
-            result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
-            messageWaitingDone.remove(xid);
-            if (result == null) {
-                // if result is null, then it means the switch can handle this
-                // message successfully
-                // convert the result into a Boolean with value true
-                status = true;
-                // logger.debug("Successfully send " +
-                // msg.getType().toString());
-                result = status;
-            } else {
-                // if result is not null, this means the switch can't handle
-                // this message
-                // the result if OFError already
-                logger.debug("Send {} failed --> {}", msg.getType().toString(),
-                        ((OFError) result).toString());
-            }
-            return result;
-        } catch (Exception e) {
-            logger.warn("Timeout while waiting for {} reply", msg.getType()
-                    .toString());
-            // convert the result into a Boolean with value false
-            status = false;
-            result = status;
-            return result;
-        }
+        return syncMessageInternal(msg, xid, true);
     }
 
     /**
@@ -278,7 +247,7 @@ public class SwitchHandler implements ISwitch {
      * priority. It will be served after high priority messages. The method
      * should be used for non-critical messages such as statistics request,
      * discovery packets, etc. The specified XID is inserted into the message.
-     * 
+     *
      * @param msg
      *            The OF message to be Sent
      * @param xid
@@ -300,7 +269,7 @@ public class SwitchHandler implements ISwitch {
      * method should be used for critical messages such as hello, echo reply
      * etc. An unique XID is generated automatically and inserted into the
      * message.
-     * 
+     *
      * @param msg
      *            The OF message to be sent
      * @return The XID used
@@ -315,7 +284,7 @@ public class SwitchHandler implements ISwitch {
      * priority. It will be served first before normal priority messages. The
      * method should be used for critical messages such as hello, echo reply
      * etc. The specified XID is inserted into the message.
-     * 
+     *
      * @param msg
      *            The OF message to be sent
      * @return The XID used
@@ -339,23 +308,67 @@ public class SwitchHandler implements ISwitch {
         }
     }
 
+    /**
+     * This method bypasses the transmit queue and sends the message over the
+     * socket directly. If the input xid is not null, the specified xid is
+     * inserted into the message. Otherwise, an unique xid is generated
+     * automatically and inserted into the message.
+     *
+     * @param msg
+     *            Message to be sent
+     * @param xid
+     *            Message xid
+     */
+    private void asyncSendNow(OFMessage msg, Integer xid) {
+        if (xid == null) {
+            xid = getNextXid();
+        }
+        msg.setXid(xid);
+
+        asyncSendNow(msg);
+    }
+
+    /**
+     * This method bypasses the transmit queue and sends the message over the
+     * socket directly.
+     *
+     * @param msg
+     *            Message to be sent
+     */
+    private void asyncSendNow(OFMessage msg) {
+        if (msgReadWriteService == null) {
+            logger.warn(
+                    "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
+                    msg);
+            return;
+        }
+
+        try {
+            msgReadWriteService.asyncSend(msg);
+        } catch (Exception e) {
+            reportError(e);
+        }
+    }
+
     public void handleMessages() {
         List<OFMessage> msgs = null;
 
         try {
-            msgs = msgReadWriteService.readMessages();
+            if (msgReadWriteService != null) {
+                msgs = msgReadWriteService.readMessages();
+            }
         } catch (Exception e) {
             reportError(e);
         }
 
         if (msgs == null) {
-            logger.debug("{} is down", toString());
+            logger.debug("{} is down", this);
             // the connection is down, inform core
             reportSwitchStateChange(false);
             return;
         }
         for (OFMessage msg : msgs) {
-            logger.trace("Message received: {}", msg.toString());
+            logger.trace("Message received: {}", msg);
             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
             OFType type = msg.getType();
             switch (type) {
@@ -378,7 +391,8 @@ public class SwitchHandler implements ISwitch {
             case ECHO_REQUEST:
                 OFEchoReply echoReply = (OFEchoReply) factory
                         .getMessage(OFType.ECHO_REPLY);
-                asyncFastSend(echoReply);
+                // respond immediately
+                asyncSendNow(echoReply, msg.getXid());
                 break;
             case ECHO_REPLY:
                 this.probeSent = false;
@@ -447,7 +461,7 @@ public class SwitchHandler implements ISwitch {
                             // send a probe to see if the switch is still alive
                             logger.debug(
                                     "Send idle probe (Echo Request) to {}",
-                                    toString());
+                                    this);
                             probeSent = true;
                             OFMessage echo = factory
                                     .getMessage(OFType.ECHO_REQUEST);
@@ -489,8 +503,11 @@ public class SwitchHandler implements ISwitch {
     private void reportError(Exception e) {
         if (e instanceof AsynchronousCloseException
                 || e instanceof InterruptedException
-                || e instanceof SocketException || e instanceof IOException) {
-            logger.debug("Caught exception {}", e.getMessage());
+                || e instanceof SocketException || e instanceof IOException
+                || e instanceof ClosedSelectorException) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Caught exception {}", e.getMessage());
+            }
         } else {
             logger.warn("Caught exception ", e);
         }
@@ -757,7 +774,14 @@ public class SwitchHandler implements ISwitch {
                     if (!transmitQ.isEmpty()) {
                         PriorityMessage pmsg = transmitQ.poll();
                         msgReadWriteService.asyncSend(pmsg.msg);
-                        logger.trace("Message sent: {}", pmsg.toString());
+                        logger.trace("Message sent: {}", pmsg);
+                        /*
+                         * If syncReply is set to true, wait for the response
+                         * back.
+                         */
+                        if (pmsg.syncReply) {
+                            syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
+                        }
                     }
                     Thread.sleep(10);
                 } catch (InterruptedException ie) {
@@ -816,19 +840,40 @@ public class SwitchHandler implements ISwitch {
     }
 
     /**
-     * Sends synchronous Barrier message
+     * Send Barrier message synchronously. The caller will be blocked until the
+     * Barrier reply is received.
      */
     @Override
-    public Object sendBarrierMessage() {
+    public Object syncSendBarrierMessage() {
         OFBarrierRequest barrierMsg = new OFBarrierRequest();
         return syncSend(barrierMsg);
     }
 
+    /**
+     * Send Barrier message asynchronously. The caller is not blocked. The
+     * Barrier message will be sent in a transmit thread which will be blocked
+     * until the Barrier reply is received.
+     */
+    @Override
+    public Object asyncSendBarrierMessage() {
+        if (transmitQ == null) {
+            return Boolean.FALSE;
+        }
+
+        OFBarrierRequest barrierMsg = new OFBarrierRequest();
+        int xid = getNextXid();
+
+        barrierMsg.setXid(xid);
+        transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
+
+        return Boolean.TRUE;
+    }
+
     /**
      * This method returns the switch liveness timeout value. If controller did
      * not receive any message from the switch for such a long period,
      * controller will tear down the connection to the switch.
-     * 
+     *
      * @return The timeout value
      */
     private static int getSwitchLivenessTimeout() {
@@ -844,4 +889,58 @@ public class SwitchHandler implements ISwitch {
 
         return rv;
     }
+
+    /**
+     * This method performs synchronous operations for a given message. If
+     * syncRequest is set to true, the message will be sent out followed by a
+     * Barrier request message. Then it's blocked until the Barrier rely arrives
+     * or timeout. If syncRequest is false, it simply skips the message send and
+     * just waits for the response back.
+     *
+     * @param msg
+     *            Message to be sent
+     * @param xid
+     *            Message XID
+     * @param request
+     *            If set to true, the message the message will be sent out
+     *            followed by a Barrier request message. If set to false, it
+     *            simply skips the sending and just waits for the Barrier reply.
+     * @return the result
+     */
+    private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
+        SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
+        messageWaitingDone.put(xid, worker);
+        Object result = null;
+        Boolean status = false;
+        Future<Object> submit = executor.submit(worker);
+        try {
+            result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
+            messageWaitingDone.remove(xid);
+            if (result == null) {
+                // if result is null, then it means the switch can handle this
+                // message successfully
+                // convert the result into a Boolean with value true
+                status = true;
+                // logger.debug("Successfully send " +
+                // msg.getType().toString());
+                result = status;
+            } else {
+                // if result is not null, this means the switch can't handle
+                // this message
+                // the result if OFError already
+                if (logger.isDebugEnabled()) {
+                  logger.debug("Send {} failed --> {}", msg.getType(),
+                               ((OFError) result));
+                }
+            }
+            return result;
+        } catch (Exception e) {
+            logger.warn("Timeout while waiting for {} reply", msg.getType()
+                    .toString());
+            // convert the result into a Boolean with value false
+            status = false;
+            result = status;
+            return result;
+        }
+    }
 }