Moving a recently added trim() call in SwitchHandler after Null check.
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
index 8881fb53640caa58e692a08bdbdbb7bbb80916e6..cba8b1d4f17c38a9d232e1b54a3463c96cfe8dbf 100644 (file)
@@ -9,12 +9,14 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -28,11 +30,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
 import org.openflow.protocol.OFBarrierReply;
 import org.openflow.protocol.OFEchoReply;
 import org.openflow.protocol.OFError;
@@ -63,7 +67,6 @@ public class SwitchHandler implements ISwitch {
     private static final int SWITCH_LIVENESS_TIMER = 5000;
     private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
     private int MESSAGE_RESPONSE_TIMER = 2000;
-    private static final int bufferSize = 1024 * 1024;
 
     private String instanceName;
     private ISwitch thisISwitch;
@@ -74,10 +77,7 @@ public class SwitchHandler implements ISwitch {
     private Byte tables;
     private Integer actions;
     private Selector selector;
-    private SelectionKey clientSelectionKey;
     private SocketChannel socket;
-    private ByteBuffer inBuffer;
-    private ByteBuffer outBuffer;
     private BasicFactory factory;
     private AtomicInteger xid;
     private SwitchState state;
@@ -90,9 +90,12 @@ public class SwitchHandler implements ISwitch {
     private ExecutorService executor;
     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
     private boolean running;
+    private IMessageReadWrite msgReadWriteService;
     private Thread switchHandlerThread;
     private Integer responseTimerValue;
-
+       private PriorityBlockingQueue<PriorityMessage> transmitQ;
+    private Thread transmitThread;
+    
     private enum SwitchState {
         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
                 3);
@@ -130,44 +133,37 @@ public class SwitchHandler implements ISwitch {
         this.periodicTimer = null;
         this.executor = Executors.newFixedThreadPool(4);
         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
-        this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
-        this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
         String rTimer = System.getProperty("of.messageResponseTimer");
         if (rTimer != null) {
-            try {
-                responseTimerValue = Integer.decode(rTimer);
-            } catch (NumberFormatException e) {
-                logger.warn("Invalid of.messageResponseTimer:" + rTimer + ", use default("
-                        + MESSAGE_RESPONSE_TIMER+ ")");
-            }
+               try {
+                       responseTimerValue = Integer.decode(rTimer);
+               } catch (NumberFormatException e) {
+                               logger.warn("Invalid of.messageResponseTimer: {} use default({})",
+                                               rTimer, MESSAGE_RESPONSE_TIMER);
+               }
         }
-    }
+       }
 
     public void start() {
         try {
-            this.selector = SelectorProvider.provider().openSelector();
-            this.socket.configureBlocking(false);
-            this.socket.socket().setTcpNoDelay(true);
-            this.clientSelectionKey = this.socket.register(this.selector,
-                    SelectionKey.OP_READ);
+               startTransmitThread();
+               setupCommChannel();
+               sendFirstHello();
             startHandlerThread();
         } catch (Exception e) {
             reportError(e);
-            return;
         }
     }
 
     private void startHandlerThread() {
-        OFMessage msg = factory.getMessage(OFType.HELLO);
-        asyncSend(msg);
         switchHandlerThread = new Thread(new Runnable() {
             @Override
             public void run() {
                 running = true;
                 while (running) {
                     try {
-                        // wait for an incoming connection
+                       // wait for an incoming connection
                         selector.select(0);
                         Iterator<SelectionKey> selectedKeys = selector
                                 .selectedKeys().iterator();
@@ -191,17 +187,33 @@ public class SwitchHandler implements ISwitch {
     }
 
     public void stop() {
-        try {
-            running = false;
-            selector.wakeup();
-            cancelSwitchTimer();
-            this.clientSelectionKey.cancel();
-            this.socket.close();
-            executor.shutdown();
-        } catch (Exception e) {
-               // do nothing since we are shutting down.
-               return;
-        }
+       running = false;
+       cancelSwitchTimer();
+       try {
+               selector.wakeup();
+               selector.close();
+               } catch (Exception e) {
+               }
+       try {
+                       socket.close();
+               } catch (Exception e) {
+               }
+       try {
+                       msgReadWriteService.stop();
+               } catch (Exception e) {
+               }
+       executor.shutdown();
+       
+       selector = null;
+       socket = null;
+               msgReadWriteService = null;
+               
+               if (switchHandlerThread != null) {
+                       switchHandlerThread.interrupt();
+               }
+               if (transmitThread != null) {
+                       transmitThread.interrupt();
+               }
     }
 
     @Override
@@ -209,77 +221,94 @@ public class SwitchHandler implements ISwitch {
         return this.xid.incrementAndGet();
     }
 
+       /**
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. An unique XID is generated automatically and
+        * inserted into the message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
     @Override
     public Integer asyncSend(OFMessage msg) {
-        return asyncSend(msg, getNextXid());
+       return asyncSend(msg, getNextXid());
+    }
+
+       /**
+        * This method puts the message in an outgoing priority queue with normal
+        * priority. It will be served after high priority messages. The method
+        * should be used for non-critical messages such as statistics request,
+        * discovery packets, etc. The specified XID is inserted into the message.
+        * 
+        * @param msg The OF message to be Sent
+        * @param xid The XID to be used in the message
+        * @return The XID used
+        */
+    @Override
+    public Integer asyncSend(OFMessage msg, int xid) {
+       msg.setXid(xid);
+       transmitQ.add(new PriorityMessage(msg, 0));
+        return xid;
     }
 
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. An unique XID is generated automatically and inserted into the
+        * message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
     @Override
-    public Integer asyncSend(OFMessage msg, int xid) {
-        synchronized (outBuffer) {
-            /*
-            if ((msg.getType() != OFType.ECHO_REQUEST) &&
-                       (msg.getType() != OFType.ECHO_REPLY)) {
-               logger.debug("sending " + msg.getType().toString() + " to " + toString());
-            }
-             */
-            msg.setXid(xid);
-            int msgLen = msg.getLengthU();
-            if (outBuffer.remaining() < msgLen) {
-                // increase the buffer size so that it can contain this message
-                ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
-                        .capacity()
-                        + msgLen);
-                outBuffer.flip();
-                newBuffer.put(outBuffer);
-                outBuffer = newBuffer;
-            }
-            msg.writeTo(outBuffer);
-            outBuffer.flip();
-            try {
-                socket.write(outBuffer);
-                outBuffer.compact();
-                if (outBuffer.position() > 0) {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_WRITE, this);
-                }
-                logger.trace("Message sent: " + msg.toString());
-            } catch (Exception e) {
-                reportError(e);
-            }
-        }
+    public Integer asyncFastSend(OFMessage msg) {
+       return asyncFastSend(msg, getNextXid());
+    }
+
+       /**
+        * This method puts the message in an outgoing priority queue with high
+        * priority. It will be served first before normal priority messages. The
+        * method should be used for critical messages such as hello, echo reply
+        * etc. The specified XID is inserted into the message.
+        * 
+        * @param msg The OF message to be sent
+        * @return The XID used
+        */
+    @Override
+    public Integer asyncFastSend(OFMessage msg, int xid) {
+       msg.setXid(xid);
+       transmitQ.add(new PriorityMessage(msg, 1));
         return xid;
     }
 
-    public void resumeSend() {
-        synchronized (outBuffer) {
-            try {
-                outBuffer.flip();
-                socket.write(outBuffer);
-                outBuffer.compact();
-                if (outBuffer.position() > 0) {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_WRITE, this);
-                } else {
-                    this.clientSelectionKey = this.socket.register(
-                            this.selector, SelectionKey.OP_READ, this);
-                }
-            } catch (Exception e) {
-                reportError(e);
-            }
-        }
+   public void resumeSend() {
+        try {
+                       msgReadWriteService.resumeSend();
+               } catch (Exception e) {
+                       reportError(e);
+               }
     }
 
     public void handleMessages() {
-        List<OFMessage> msgs = readMessages();
+        List<OFMessage> msgs = null;
+        
+        try {
+               msgs = msgReadWriteService.readMessages();
+               } catch (Exception e) {
+                       reportError(e);
+               }
+               
         if (msgs == null) {
-            logger.debug(toString() + " is down");
+            logger.debug("{} is down", toString());
             // the connection is down, inform core
             reportSwitchStateChange(false);
             return;
         }
         for (OFMessage msg : msgs) {
-            logger.trace("Message received: " + msg.toString());
+            logger.trace("Message received: {}", msg.toString());
             /*
             if  ((msg.getType() != OFType.ECHO_REQUEST) &&
                        (msg.getType() != OFType.ECHO_REPLY)) {
@@ -293,7 +322,7 @@ public class SwitchHandler implements ISwitch {
                 // send feature request
                 OFMessage featureRequest = factory
                         .getMessage(OFType.FEATURES_REQUEST);
-                asyncSend(featureRequest);
+                asyncFastSend(featureRequest);
                 // delete all pre-existing flows
                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
                 OFFlowMod flowMod = (OFFlowMod) factory
@@ -301,14 +330,14 @@ public class SwitchHandler implements ISwitch {
                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
                         .setOutPort(OFPort.OFPP_NONE).setLength(
                                 (short) OFFlowMod.MINIMUM_LENGTH);
-                asyncSend(flowMod);
+                asyncFastSend(flowMod);
                 this.state = SwitchState.WAIT_FEATURES_REPLY;
                 startSwitchTimer();
                 break;
             case ECHO_REQUEST:
                 OFEchoReply echoReply = (OFEchoReply) factory
                         .getMessage(OFType.ECHO_REPLY);
-                asyncSend(echoReply);
+                asyncFastSend(echoReply);
                 break;
             case ECHO_REPLY:
                 this.probeSent = false;
@@ -362,28 +391,6 @@ public class SwitchHandler implements ISwitch {
 
     }
 
-    private List<OFMessage> readMessages() {
-        List<OFMessage> msgs = null;
-        int bytesRead;
-        try {
-            bytesRead = socket.read(inBuffer);
-        } catch (Exception e) {
-            reportError(e);
-            return null;
-        }
-        if (bytesRead == -1) {
-            return null;
-        }
-        inBuffer.flip();
-        msgs = factory.parseMessages(inBuffer);
-        if (inBuffer.hasRemaining()) {
-            inBuffer.compact();
-        } else {
-            inBuffer.clear();
-        }
-        return msgs;
-    }
-
     private void startSwitchTimer() {
         this.periodicTimer = new Timer();
         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
@@ -394,8 +401,7 @@ public class SwitchHandler implements ISwitch {
                     if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
                         if (probeSent) {
                             // switch failed to respond to our probe, consider it down
-                            logger.warn(toString()
-                                    + " is idle for too long, disconnect");
+                            logger.warn("{} is idle for too long, disconnect", toString());
                             reportSwitchStateChange(false);
                         } else {
                             // send a probe to see if the switch is still alive
@@ -403,14 +409,14 @@ public class SwitchHandler implements ISwitch {
                             probeSent = true;
                             OFMessage echo = factory
                                     .getMessage(OFType.ECHO_REQUEST);
-                            asyncSend(echo);
+                            asyncFastSend(echo);
                         }
                     } else {
                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
                             // send another features request
                             OFMessage request = factory
                                     .getMessage(OFType.FEATURES_REQUEST);
-                            asyncSend(request);
+                            asyncFastSend(request);
                         } else {
                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
                                 //  send another config request
@@ -418,10 +424,10 @@ public class SwitchHandler implements ISwitch {
                                         .getMessage(OFType.SET_CONFIG);
                                 config.setMissSendLength((short) 0xffff)
                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
-                                asyncSend(config);
+                                asyncFastSend(config);
                                 OFMessage getConfig = factory
                                         .getMessage(OFType.GET_CONFIG_REQUEST);
-                                asyncSend(getConfig);
+                                asyncFastSend(getConfig);
                             }
                         }
                     }
@@ -439,8 +445,12 @@ public class SwitchHandler implements ISwitch {
     }
 
     private void reportError(Exception e) {
-        //logger.error(toString() + " caught Error " + e.toString());
-        // notify core of this error event
+       if (e instanceof AsynchronousCloseException) {
+               logger.debug("Caught exception {}", e.getMessage());
+       } else {
+               logger.warn("Caught exception {}", e.getMessage());
+       }
+        // notify core of this error event and disconnect the switch
         ((Controller) core).takeSwitchEventError(this);
     }
 
@@ -473,10 +483,10 @@ public class SwitchHandler implements ISwitch {
                     .getMessage(OFType.SET_CONFIG);
             config.setMissSendLength((short) 0xffff).setLengthU(
                     OFSetConfig.MINIMUM_LENGTH);
-            asyncSend(config);
+            asyncFastSend(config);
             // send config request to make sure the switch can handle the set config
             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
-            asyncSend(getConfig);
+            asyncFastSend(getConfig);
             this.state = SwitchState.WAIT_CONFIG_REPLY;
             // inform core that a new switch is now operational
             reportSwitchStateChange(true);
@@ -545,8 +555,7 @@ public class SwitchHandler implements ISwitch {
                     .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for " + req.getType()
-                    + " replies");
+            logger.warn("Timeout while waiting for {} replies", req.getType());
             result = null; // to indicate timeout has occurred
             return result;
         }
@@ -573,13 +582,12 @@ public class SwitchHandler implements ISwitch {
             } else {
                 // if result  is not null, this means the switch can't handle this message
                 // the result if OFError already
-                logger.debug("Send " + msg.getType().toString()
-                        + " failed --> " + ((OFError) result).toString());
+                logger.debug("Send {} failed --> {}", 
+                               msg.getType().toString(), ((OFError) result).toString());
             }
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for " + msg.getType().toString()
-                    + " reply");
+            logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
             // convert the result into a Boolean with value false
             status = false;
             result = status;
@@ -637,7 +645,7 @@ public class SwitchHandler implements ISwitch {
             worker.wakeup();
         }
     }
-
+    
     @Override
     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
         return this.physicalPorts;
@@ -715,4 +723,67 @@ public class SwitchHandler implements ISwitch {
         }
         return result;
     }
+
+       /*
+        * Transmit thread polls the message out of the priority queue and invokes
+        * messaging service to transmit it over the socket channel
+        */
+    class PriorityMessageTransmit implements Runnable {
+        public void run() {
+            running = true;
+            while (running) {
+               try {
+                       if (!transmitQ.isEmpty()) {
+                               PriorityMessage pmsg = transmitQ.poll();
+                               msgReadWriteService.asyncSend(pmsg.msg);
+                               logger.trace("Message sent: {}", pmsg.toString());
+                       }
+                       Thread.sleep(10);
+               } catch (Exception e) {
+                       reportError(e);
+               }
+            }
+               transmitQ = null;
+        }
+    }
+
+    /*
+     * Setup and start the transmit thread
+     */
+    private void startTransmitThread() {       
+        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, 
+                               new Comparator<PriorityMessage>() {
+                                       public int compare(PriorityMessage p1, PriorityMessage p2) {
+                                               return p2.priority - p1.priority;
+                                       }
+                               });
+        this.transmitThread = new Thread(new PriorityMessageTransmit());
+        this.transmitThread.start();
+    }
+    
+    /*
+     * Setup communication services
+     */
+    private void setupCommChannel() throws Exception {
+        this.selector = SelectorProvider.provider().openSelector();
+        this.socket.configureBlocking(false);
+        this.socket.socket().setTcpNoDelay(true);        
+        this.msgReadWriteService = getMessageReadWriteService();
+    }
+
+    private void sendFirstHello() {
+       try {
+               OFMessage msg = factory.getMessage(OFType.HELLO);
+               asyncFastSend(msg);
+       } catch (Exception e) {
+               reportError(e);
+       }
+    }
+    
+    private IMessageReadWrite getMessageReadWriteService() throws Exception {
+       String str = System.getProperty("secureChannelEnabled");
+        return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? 
+                       new SecureMessageReadWriteService(socket, selector) : 
+                       new MessageReadWriteService(socket, selector);
+    }
 }