- Respond to switch Echo Request immediately. It bypasses transmit queue and sends...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
index 5913ad0dd960036f3362c5e2325d0af5209529ab..5d51d26a988f553f0e79a3d2a29b408ddb486871 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -9,8 +8,10 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 
+import java.io.IOException;
 import java.net.SocketException;
 import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
@@ -38,6 +39,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
 import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFBarrierRequest;
 import org.openflow.protocol.OFEchoReply;
 import org.openflow.protocol.OFError;
 import org.openflow.protocol.OFFeaturesReply;
@@ -65,7 +67,7 @@ public class SwitchHandler implements ISwitch {
     private static final Logger logger = LoggerFactory
             .getLogger(SwitchHandler.class);
     private static final int SWITCH_LIVENESS_TIMER = 5000;
-    private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
+    private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
     private int MESSAGE_RESPONSE_TIMER = 2000;
 
     private String instanceName;
@@ -93,9 +95,9 @@ public class SwitchHandler implements ISwitch {
     private IMessageReadWrite msgReadWriteService;
     private Thread switchHandlerThread;
     private Integer responseTimerValue;
-       private PriorityBlockingQueue<PriorityMessage> transmitQ;
+    private PriorityBlockingQueue<PriorityMessage> transmitQ;
     private Thread transmitThread;
-    
+
     private enum SwitchState {
         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
                 3);
@@ -116,10 +118,10 @@ public class SwitchHandler implements ISwitch {
         this.instanceName = name;
         this.thisISwitch = this;
         this.sid = (long) 0;
-        this.buffers = (int)0;
-        this.capabilities = (int)0;
-        this.tables = (byte)0;
-        this.actions = (int)0;
+        this.buffers = (int) 0;
+        this.capabilities = (int) 0;
+        this.tables = (byte) 0;
+        this.actions = (int) 0;
         this.core = core;
         this.socket = sc;
         this.factory = new BasicFactory();
@@ -136,20 +138,21 @@ public class SwitchHandler implements ISwitch {
         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
         String rTimer = System.getProperty("of.messageResponseTimer");
         if (rTimer != null) {
-               try {
-                       responseTimerValue = Integer.decode(rTimer);
-               } catch (NumberFormatException e) {
-                               logger.warn("Invalid of.messageResponseTimer: {} use default({})",
-                                               rTimer, MESSAGE_RESPONSE_TIMER);
-               }
+            try {
+                responseTimerValue = Integer.decode(rTimer);
+            } catch (NumberFormatException e) {
+                logger.warn(
+                        "Invalid of.messageResponseTimer: {} use default({})",
+                        rTimer, MESSAGE_RESPONSE_TIMER);
+            }
         }
-       }
+    }
 
     public void start() {
         try {
-               startTransmitThread();
-               setupCommChannel();
-               sendFirstHello();
+            startTransmitThread();
+            setupCommChannel();
+            sendFirstHello();
             startHandlerThread();
         } catch (Exception e) {
             reportError(e);
@@ -163,7 +166,7 @@ public class SwitchHandler implements ISwitch {
                 running = true;
                 while (running) {
                     try {
-                       // wait for an incoming connection
+                        // wait for an incoming connection
                         selector.select(0);
                         Iterator<SelectionKey> selectedKeys = selector
                                 .selectedKeys().iterator();
@@ -178,7 +181,7 @@ public class SwitchHandler implements ISwitch {
                             }
                         }
                     } catch (Exception e) {
-                       reportError(e);
+                        reportError(e);
                     }
                 }
             }
@@ -187,33 +190,31 @@ public class SwitchHandler implements ISwitch {
     }
 
     public void stop() {
-       running = false;
-       cancelSwitchTimer();
-       try {
-               selector.wakeup();
-               selector.close();
-               } catch (Exception e) {
-               }
-       try {
-                       socket.close();
-               } catch (Exception e) {
-               }
-       try {
-                       msgReadWriteService.stop();
-               } catch (Exception e) {
-               }
-       executor.shutdown();
-       
-       selector = null;
-       socket = null;
-               msgReadWriteService = null;
-               
-               if (switchHandlerThread != null) {
-                       switchHandlerThread.interrupt();
-               }
-               if (transmitThread != null) {
-                       transmitThread.interrupt();
-               }
+        running = false;
+        cancelSwitchTimer();
+        try {
+            selector.wakeup();
+            selector.close();
+        } catch (Exception e) {
+        }
+        try {
+            socket.close();
+        } catch (Exception e) {
+        }
+        try {
+            msgReadWriteService.stop();
+        } catch (Exception e) {
+        }
+        executor.shutdown();
+
+        msgReadWriteService = null;
+
+        if (switchHandlerThread != null) {
+            switchHandlerThread.interrupt();
+        }
+        if (transmitThread != null) {
+            transmitThread.interrupt();
+        }
     }
 
     @Override
@@ -221,106 +222,153 @@ public class SwitchHandler implements ISwitch {
         return this.xid.incrementAndGet();
     }
 
-       /**
-        * This method puts the message in an outgoing priority queue with normal
-        * priority. It will be served after high priority messages. The method
-        * should be used for non-critical messages such as statistics request,
-        * discovery packets, etc. An unique XID is generated automatically and
-        * inserted into the message.
-        * 
-        * @param msg The OF message to be sent
-        * @return The XID used
-        */
+    /**
+     * This method puts the message in an outgoing priority queue with normal
+     * priority. It will be served after high priority messages. The method
+     * should be used for non-critical messages such as statistics request,
+     * discovery packets, etc. An unique XID is generated automatically and
+     * inserted into the message.
+     *
+     * @param msg
+     *            The OF message to be sent
+     * @return The XID used
+     */
     @Override
     public Integer asyncSend(OFMessage msg) {
-       return asyncSend(msg, getNextXid());
-    }
-
-       /**
-        * This method puts the message in an outgoing priority queue with normal
-        * priority. It will be served after high priority messages. The method
-        * should be used for non-critical messages such as statistics request,
-        * discovery packets, etc. The specified XID is inserted into the message.
-        * 
-        * @param msg The OF message to be Sent
-        * @param xid The XID to be used in the message
-        * @return The XID used
-        */
+        return asyncSend(msg, getNextXid());
+    }
+
+    private Object syncSend(OFMessage msg, int xid) {
+        return syncMessageInternal(msg, xid, true);
+    }
+
+    /**
+     * This method puts the message in an outgoing priority queue with normal
+     * priority. It will be served after high priority messages. The method
+     * should be used for non-critical messages such as statistics request,
+     * discovery packets, etc. The specified XID is inserted into the message.
+     *
+     * @param msg
+     *            The OF message to be Sent
+     * @param xid
+     *            The XID to be used in the message
+     * @return The XID used
+     */
     @Override
     public Integer asyncSend(OFMessage msg, int xid) {
-       msg.setXid(xid);
-       if (transmitQ != null) {
-               transmitQ.add(new PriorityMessage(msg, 0));
-       }
+        msg.setXid(xid);
+        if (transmitQ != null) {
+            transmitQ.add(new PriorityMessage(msg, 0));
+        }
         return xid;
     }
 
-       /**
-        * This method puts the message in an outgoing priority queue with high
-        * priority. It will be served first before normal priority messages. The
-        * method should be used for critical messages such as hello, echo reply
-        * etc. An unique XID is generated automatically and inserted into the
-        * message.
-        * 
-        * @param msg The OF message to be sent
-        * @return The XID used
-        */
+    /**
+     * This method puts the message in an outgoing priority queue with high
+     * priority. It will be served first before normal priority messages. The
+     * method should be used for critical messages such as hello, echo reply
+     * etc. An unique XID is generated automatically and inserted into the
+     * message.
+     *
+     * @param msg
+     *            The OF message to be sent
+     * @return The XID used
+     */
     @Override
     public Integer asyncFastSend(OFMessage msg) {
-       return asyncFastSend(msg, getNextXid());
-    }
-
-       /**
-        * This method puts the message in an outgoing priority queue with high
-        * priority. It will be served first before normal priority messages. The
-        * method should be used for critical messages such as hello, echo reply
-        * etc. The specified XID is inserted into the message.
-        * 
-        * @param msg The OF message to be sent
-        * @return The XID used
-        */
+        return asyncFastSend(msg, getNextXid());
+    }
+
+    /**
+     * This method puts the message in an outgoing priority queue with high
+     * priority. It will be served first before normal priority messages. The
+     * method should be used for critical messages such as hello, echo reply
+     * etc. The specified XID is inserted into the message.
+     *
+     * @param msg
+     *            The OF message to be sent
+     * @return The XID used
+     */
     @Override
     public Integer asyncFastSend(OFMessage msg, int xid) {
-       msg.setXid(xid);
-       if (transmitQ != null) {
-               transmitQ.add(new PriorityMessage(msg, 1));
-       }
+        msg.setXid(xid);
+        if (transmitQ != null) {
+            transmitQ.add(new PriorityMessage(msg, 1));
+        }
         return xid;
     }
 
-   public void resumeSend() {
+    public void resumeSend() {
         try {
-               if (msgReadWriteService != null) {
-                       msgReadWriteService.resumeSend();
-               }
-               } catch (Exception e) {
-                       reportError(e);
-               }
+            if (msgReadWriteService != null) {
+                msgReadWriteService.resumeSend();
+            }
+        } catch (Exception e) {
+            reportError(e);
+        }
+    }
+
+    /**
+     * This method bypasses the transmit queue and sends the message over the
+     * socket directly. If the input xid is not null, the specified xid is
+     * inserted into the message. Otherwise, an unique xid is generated
+     * automatically and inserted into the message.
+     *
+     * @param msg
+     *            Message to be sent
+     * @param xid
+     *            Message xid
+     */
+    private void asyncSendNow(OFMessage msg, Integer xid) {
+        if (xid == null) {
+            xid = getNextXid();
+        }
+        msg.setXid(xid);
+
+        asyncSendNow(msg);
+    }
+
+    /**
+     * This method bypasses the transmit queue and sends the message over the
+     * socket directly.
+     *
+     * @param msg
+     *            Message to be sent
+     */
+    private void asyncSendNow(OFMessage msg) {
+        if (msgReadWriteService == null) {
+            logger.warn(
+                    "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
+                    msg);
+            return;
+        }
+
+        try {
+            msgReadWriteService.asyncSend(msg);
+        } catch (Exception e) {
+            reportError(e);
+        }
     }
 
     public void handleMessages() {
         List<OFMessage> msgs = null;
-        
+
         try {
-               msgs = msgReadWriteService.readMessages();
-               } catch (Exception e) {
-                       reportError(e);
-               }
-               
+            if (msgReadWriteService != null) {
+                msgs = msgReadWriteService.readMessages();
+            }
+        } catch (Exception e) {
+            reportError(e);
+        }
+
         if (msgs == null) {
-            logger.debug("{} is down", toString());
+            logger.debug("{} is down", this);
             // the connection is down, inform core
             reportSwitchStateChange(false);
             return;
         }
         for (OFMessage msg : msgs) {
-            logger.trace("Message received: {}", msg.toString());
-            /*
-            if  ((msg.getType() != OFType.ECHO_REQUEST) &&
-                       (msg.getType() != OFType.ECHO_REPLY)) {
-               logger.debug(msg.getType().toString() + " received from sw " + toString());
-            }
-             */
+            logger.trace("Message received: {}", msg);
             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
             OFType type = msg.getType();
             switch (type) {
@@ -334,8 +382,8 @@ public class SwitchHandler implements ISwitch {
                 OFFlowMod flowMod = (OFFlowMod) factory
                         .getMessage(OFType.FLOW_MOD);
                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
-                        .setOutPort(OFPort.OFPP_NONE).setLength(
-                                (short) OFFlowMod.MINIMUM_LENGTH);
+                        .setOutPort(OFPort.OFPP_NONE)
+                        .setLength((short) OFFlowMod.MINIMUM_LENGTH);
                 asyncFastSend(flowMod);
                 this.state = SwitchState.WAIT_FEATURES_REPLY;
                 startSwitchTimer();
@@ -343,7 +391,8 @@ public class SwitchHandler implements ISwitch {
             case ECHO_REQUEST:
                 OFEchoReply echoReply = (OFEchoReply) factory
                         .getMessage(OFType.ECHO_REPLY);
-                asyncFastSend(echoReply);
+                // respond immediately
+                asyncSendNow(echoReply, msg.getXid());
                 break;
             case ECHO_REPLY:
                 this.probeSent = false;
@@ -352,7 +401,8 @@ public class SwitchHandler implements ISwitch {
                 processFeaturesReply((OFFeaturesReply) msg);
                 break;
             case GET_CONFIG_REPLY:
-                // make sure that the switch can send the whole packet to the controller
+                // make sure that the switch can send the whole packet to the
+                // controller
                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
                     this.state = SwitchState.OPERATIONAL;
                 }
@@ -381,18 +431,14 @@ public class SwitchHandler implements ISwitch {
     }
 
     private void processPortStatusMsg(OFPortStatus msg) {
-        //short portNumber = msg.getDesc().getPortNumber();
         OFPhysicalPort port = msg.getDesc();
         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
             updatePhysicalPort(port);
-            //logger.debug("Port " + portNumber + " on " + toString() + " modified");
         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
             updatePhysicalPort(port);
-            //logger.debug("Port " + portNumber + " on " + toString() + " added");
         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
                 .ordinal()) {
             deletePhysicalPort(port);
-            //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
         }
 
     }
@@ -404,14 +450,18 @@ public class SwitchHandler implements ISwitch {
             public void run() {
                 try {
                     Long now = System.currentTimeMillis();
-                    if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
+                    if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
                         if (probeSent) {
-                            // switch failed to respond to our probe, consider it down
-                            logger.warn("{} is idle for too long, disconnect", toString());
+                            // switch failed to respond to our probe, consider
+                            // it down
+                            logger.warn("{} is idle for too long, disconnect",
+                                    toString());
                             reportSwitchStateChange(false);
                         } else {
                             // send a probe to see if the switch is still alive
-                            //logger.debug("Send idle probe (Echo Request) to " + switchName());
+                            logger.debug(
+                                    "Send idle probe (Echo Request) to {}",
+                                    this);
                             probeSent = true;
                             OFMessage echo = factory
                                     .getMessage(OFType.ECHO_REQUEST);
@@ -425,7 +475,7 @@ public class SwitchHandler implements ISwitch {
                             asyncFastSend(request);
                         } else {
                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
-                                //  send another config request
+                                // send another config request
                                 OFSetConfig config = (OFSetConfig) factory
                                         .getMessage(OFType.SET_CONFIG);
                                 config.setMissSendLength((short) 0xffff)
@@ -451,20 +501,23 @@ public class SwitchHandler implements ISwitch {
     }
 
     private void reportError(Exception e) {
-       if (e instanceof AsynchronousCloseException ||
-               e instanceof InterruptedException ||
-               e instanceof SocketException) {
-               logger.debug("Caught exception {}", e.getMessage());
-       } else {
-               logger.warn("Caught exception {}", e.getMessage());
-       }
+        if (e instanceof AsynchronousCloseException
+                || e instanceof InterruptedException
+                || e instanceof SocketException || e instanceof IOException
+                || e instanceof ClosedSelectorException) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Caught exception {}", e.getMessage());
+            }
+        } else {
+            logger.warn("Caught exception ", e);
+        }
         // notify core of this error event and disconnect the switch
         ((Controller) core).takeSwitchEventError(this);
     }
 
     private void reportSwitchStateChange(boolean added) {
         if (added) {
-            ((Controller) core).takeSwtichEventAdd(this);
+            ((Controller) core).takeSwitchEventAdd(this);
         } else {
             ((Controller) core).takeSwitchEventDelete(this);
         }
@@ -492,7 +545,8 @@ public class SwitchHandler implements ISwitch {
             config.setMissSendLength((short) 0xffff).setLengthU(
                     OFSetConfig.MINIMUM_LENGTH);
             asyncFastSend(config);
-            // send config request to make sure the switch can handle the set config
+            // send config request to make sure the switch can handle the set
+            // config
             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
             asyncFastSend(getConfig);
             this.state = SwitchState.WAIT_CONFIG_REPLY;
@@ -505,8 +559,7 @@ public class SwitchHandler implements ISwitch {
         Short portNumber = port.getPortNumber();
         physicalPorts.put(portNumber, port);
         portBandwidth
-                .put(
-                        portNumber,
+                .put(portNumber,
                         port.getCurrentFeatures()
                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
                                         | OFPortFeatures.OFPPF_10MB_HD
@@ -519,7 +572,7 @@ public class SwitchHandler implements ISwitch {
                                                 .getValue()
                                         | OFPortFeatures.OFPPF_1GB_HD
                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
-                                        .getValue()));
+                                            .getValue()));
     }
 
     private void deletePhysicalPort(OFPhysicalPort port) {
@@ -535,11 +588,16 @@ public class SwitchHandler implements ISwitch {
 
     @Override
     public String toString() {
-        return ("["
-                + this.socket.toString()
-                + " SWID "
-                + (isOperational() ? HexString.toHexString(this.sid)
-                        : "unkbown") + "]");
+        try {
+            return ("Switch:"
+                    + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
+                    + " SWID:" + (isOperational() ? HexString
+                    .toHexString(this.sid) : "unknown"));
+        } catch (Exception e) {
+            return (isOperational() ? HexString.toHexString(this.sid)
+                    : "unknown");
+        }
+
     }
 
     @Override
@@ -559,8 +617,7 @@ public class SwitchHandler implements ISwitch {
         Future<Object> submit = executor.submit(worker);
         Object result = null;
         try {
-            result = submit
-                    .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
+            result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
             return result;
         } catch (Exception e) {
             logger.warn("Timeout while waiting for {} replies", req.getType());
@@ -571,41 +628,13 @@ public class SwitchHandler implements ISwitch {
 
     @Override
     public Object syncSend(OFMessage msg) {
-        Integer xid = getNextXid();
-        SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
-        messageWaitingDone.put(xid, worker);
-        Object result = null;
-        Boolean status = false;
-        Future<Object> submit = executor.submit(worker);
-        try {
-            result = submit
-                    .get(responseTimerValue, TimeUnit.MILLISECONDS);
-            messageWaitingDone.remove(xid);
-            if (result == null) {
-                // if result  is null, then it means the switch can handle this message successfully
-                // convert the result into a Boolean with value true
-                status = true;
-                //logger.debug("Successfully send " + msg.getType().toString());
-                result = status;
-            } else {
-                // if result  is not null, this means the switch can't handle this message
-                // the result if OFError already
-                logger.debug("Send {} failed --> {}", 
-                               msg.getType().toString(), ((OFError) result).toString());
-            }
-            return result;
-        } catch (Exception e) {
-            logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
-            // convert the result into a Boolean with value false
-            status = false;
-            result = status;
-            return result;
-        }
+        int xid = getNextXid();
+        return syncSend(msg, xid);
     }
 
     /*
-     * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
-     * wake up associated task so that it can continue
+     * Either a BarrierReply or a OFError is received. If this is a reply for an
+     * outstanding sync message, wake up associated task so that it can continue
      */
     private void processBarrierReply(OFBarrierReply msg) {
         Integer xid = msg.getXid();
@@ -626,7 +655,8 @@ public class SwitchHandler implements ISwitch {
             xid = errorMsg.getXid();
         }
         /*
-         * the error can be a reply to a synchronous message or to a statistic request message
+         * the error can be a reply to a synchronous message or to a statistic
+         * request message
          */
         Callable<?> worker = messageWaitingDone.remove(xid);
         if (worker == null) {
@@ -653,7 +683,7 @@ public class SwitchHandler implements ISwitch {
             worker.wakeup();
         }
     }
-    
+
     @Override
     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
         return this.physicalPorts;
@@ -678,12 +708,12 @@ public class SwitchHandler implements ISwitch {
     public Byte getTables() {
         return this.tables;
     }
-    
+
     @Override
     public Integer getActions() {
         return this.actions;
     }
-    
+
     @Override
     public Integer getCapabilities() {
         return this.capabilities;
@@ -732,68 +762,185 @@ public class SwitchHandler implements ISwitch {
         return result;
     }
 
-       /*
-        * Transmit thread polls the message out of the priority queue and invokes
-        * messaging service to transmit it over the socket channel
-        */
+    /*
+     * Transmit thread polls the message out of the priority queue and invokes
+     * messaging service to transmit it over the socket channel
+     */
     class PriorityMessageTransmit implements Runnable {
         public void run() {
             running = true;
             while (running) {
-               try {
-                       if (!transmitQ.isEmpty()) {
-                               PriorityMessage pmsg = transmitQ.poll();
-                               msgReadWriteService.asyncSend(pmsg.msg);
-                               logger.trace("Message sent: {}", pmsg.toString());
-                       }
-                       Thread.sleep(10);
-               } catch (InterruptedException ie) {
-                       reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
-               } catch (Exception e) {
-                       reportError(e);
-               }
+                try {
+                    if (!transmitQ.isEmpty()) {
+                        PriorityMessage pmsg = transmitQ.poll();
+                        msgReadWriteService.asyncSend(pmsg.msg);
+                        logger.trace("Message sent: {}", pmsg);
+                        /*
+                         * If syncReply is set to true, wait for the response
+                         * back.
+                         */
+                        if (pmsg.syncReply) {
+                            syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
+                        }
+                    }
+                    Thread.sleep(10);
+                } catch (InterruptedException ie) {
+                    reportError(new InterruptedException(
+                            "PriorityMessageTransmit thread interrupted"));
+                } catch (Exception e) {
+                    reportError(e);
+                }
             }
-               transmitQ = null;
+            transmitQ = null;
         }
     }
 
     /*
      * Setup and start the transmit thread
      */
-    private void startTransmitThread() {       
-        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, 
-                               new Comparator<PriorityMessage>() {
-                                       public int compare(PriorityMessage p1, PriorityMessage p2) {
-                                               return p2.priority - p1.priority;
-                                       }
-                               });
+    private void startTransmitThread() {
+        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
+                new Comparator<PriorityMessage>() {
+                    public int compare(PriorityMessage p1, PriorityMessage p2) {
+                        if (p2.priority != p1.priority) {
+                            return p2.priority - p1.priority;
+                        } else {
+                            return (p2.seqNum < p1.seqNum) ? 1 : -1;
+                        }
+                    }
+                });
         this.transmitThread = new Thread(new PriorityMessageTransmit());
         this.transmitThread.start();
     }
-    
+
     /*
      * Setup communication services
      */
     private void setupCommChannel() throws Exception {
         this.selector = SelectorProvider.provider().openSelector();
         this.socket.configureBlocking(false);
-        this.socket.socket().setTcpNoDelay(true);        
+        this.socket.socket().setTcpNoDelay(true);
         this.msgReadWriteService = getMessageReadWriteService();
     }
 
     private void sendFirstHello() {
-       try {
-               OFMessage msg = factory.getMessage(OFType.HELLO);
-               asyncFastSend(msg);
-       } catch (Exception e) {
-               reportError(e);
-       }
-    }
-    
+        try {
+            OFMessage msg = factory.getMessage(OFType.HELLO);
+            asyncFastSend(msg);
+        } catch (Exception e) {
+            reportError(e);
+        }
+    }
+
     private IMessageReadWrite getMessageReadWriteService() throws Exception {
-       String str = System.getProperty("secureChannelEnabled");
-        return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? 
-                       new SecureMessageReadWriteService(socket, selector) : 
-                       new MessageReadWriteService(socket, selector);
+        String str = System.getProperty("secureChannelEnabled");
+        return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
+                socket, selector) : new MessageReadWriteService(socket,
+                selector);
+    }
+
+    /**
+     * Send Barrier message synchronously. The caller will be blocked until the
+     * Barrier reply is received.
+     */
+    @Override
+    public Object syncSendBarrierMessage() {
+        OFBarrierRequest barrierMsg = new OFBarrierRequest();
+        return syncSend(barrierMsg);
+    }
+
+    /**
+     * Send Barrier message asynchronously. The caller is not blocked. The
+     * Barrier message will be sent in a transmit thread which will be blocked
+     * until the Barrier reply is received.
+     */
+    @Override
+    public Object asyncSendBarrierMessage() {
+        if (transmitQ == null) {
+            return Boolean.FALSE;
+        }
+
+        OFBarrierRequest barrierMsg = new OFBarrierRequest();
+        int xid = getNextXid();
+
+        barrierMsg.setXid(xid);
+        transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
+
+        return Boolean.TRUE;
+    }
+
+    /**
+     * This method returns the switch liveness timeout value. If controller did
+     * not receive any message from the switch for such a long period,
+     * controller will tear down the connection to the switch.
+     *
+     * @return The timeout value
+     */
+    private static int getSwitchLivenessTimeout() {
+        String timeout = System.getProperty("of.switchLivenessTimeout");
+        int rv = 60500;
+
+        try {
+            if (timeout != null) {
+                rv = Integer.parseInt(timeout);
+            }
+        } catch (Exception e) {
+        }
+
+        return rv;
+    }
+
+    /**
+     * This method performs synchronous operations for a given message. If
+     * syncRequest is set to true, the message will be sent out followed by a
+     * Barrier request message. Then it's blocked until the Barrier rely arrives
+     * or timeout. If syncRequest is false, it simply skips the message send and
+     * just waits for the response back.
+     *
+     * @param msg
+     *            Message to be sent
+     * @param xid
+     *            Message XID
+     * @param request
+     *            If set to true, the message the message will be sent out
+     *            followed by a Barrier request message. If set to false, it
+     *            simply skips the sending and just waits for the Barrier reply.
+     * @return the result
+     */
+    private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
+        SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
+        messageWaitingDone.put(xid, worker);
+        Object result = null;
+        Boolean status = false;
+        Future<Object> submit = executor.submit(worker);
+        try {
+            result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
+            messageWaitingDone.remove(xid);
+            if (result == null) {
+                // if result is null, then it means the switch can handle this
+                // message successfully
+                // convert the result into a Boolean with value true
+                status = true;
+                // logger.debug("Successfully send " +
+                // msg.getType().toString());
+                result = status;
+            } else {
+                // if result is not null, this means the switch can't handle
+                // this message
+                // the result if OFError already
+                if (logger.isDebugEnabled()) {
+                  logger.debug("Send {} failed --> {}", msg.getType(),
+                               ((OFError) result));
+                }
+            }
+            return result;
+        } catch (Exception e) {
+            logger.warn("Timeout while waiting for {} reply", msg.getType()
+                    .toString());
+            // convert the result into a Boolean with value false
+            status = false;
+            result = status;
+            return result;
+        }
     }
 }