- Plugin sends Barrier msg every 100 async msgs (configurable thru config.ini: of...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / MessageReadWriteService.java
index 8e611924e41ae8b6ce5951397e002f3b5f16cf81..3dd99e60064e181b26941985d45017ddfab97e23 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -40,96 +39,98 @@ public class MessageReadWriteService implements IMessageReadWrite {
     private ByteBuffer outBuffer;
     private BasicFactory factory;
 
-    public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
-       this.socket = socket;
-       this.selector = selector;
-       this.factory = new BasicFactory();
-       this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
-       this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
-       this.clientSelectionKey = this.socket.register(this.selector,
-                       SelectionKey.OP_READ);
+    public MessageReadWriteService(SocketChannel socket, Selector selector)
+            throws ClosedChannelException {
+        this.socket = socket;
+        this.selector = selector;
+        this.factory = new BasicFactory();
+        this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
+        this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
+        this.clientSelectionKey = this.socket.register(this.selector,
+                SelectionKey.OP_READ);
     }
 
-       /**
-        * Sends the OF message out over the socket channel.
-        * 
-        * @param msg OF message to be sent
-        * @throws Exception
-        */
+    /**
+     * Sends the OF message out over the socket channel.
+     * 
+     * @param msg
+     *            OF message to be sent
+     * @throws Exception
+     */
     @Override
     public void asyncSend(OFMessage msg) throws IOException {
-       synchronized (outBuffer) {
-               int msgLen = msg.getLengthU();
-               if (outBuffer.remaining() < msgLen) {
-                       // increase the buffer size so that it can contain this message
-                       ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
-                                       .capacity()
-                                       + msgLen);
-                       outBuffer.flip();
-                       newBuffer.put(outBuffer);
-                       outBuffer = newBuffer;
-               }
-       }
-       synchronized (outBuffer) {
-               msg.writeTo(outBuffer);
-
-               if (!socket.isOpen()) {
-                       return;
-               }
-
-               outBuffer.flip();
-               socket.write(outBuffer);
-               outBuffer.compact();
-               if (outBuffer.position() > 0) {
-                       this.clientSelectionKey = this.socket.register(
-                                       this.selector, SelectionKey.OP_WRITE, this);
-               }
-               logger.trace("Message sent: {}", msg.toString());
-       }
+        synchronized (outBuffer) {
+            int msgLen = msg.getLengthU();
+            if (outBuffer.remaining() < msgLen) {
+                // increase the buffer size so that it can contain this message
+                ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
+                        .capacity() + msgLen);
+                outBuffer.flip();
+                newBuffer.put(outBuffer);
+                outBuffer = newBuffer;
+            }
+        }
+        synchronized (outBuffer) {
+            msg.writeTo(outBuffer);
+
+            if (!socket.isOpen()) {
+                return;
+            }
+
+            outBuffer.flip();
+            socket.write(outBuffer);
+            outBuffer.compact();
+            if (outBuffer.position() > 0) {
+                this.clientSelectionKey = this.socket.register(this.selector,
+                        SelectionKey.OP_WRITE, this);
+            }
+            logger.trace("Message sent: {}", msg.toString());
+        }
     }
 
-       /**
-        * Resumes sending the remaining messages in the outgoing buffer
-        * @throws Exception
-        */
+    /**
+     * Resumes sending the remaining messages in the outgoing buffer
+     * 
+     * @throws Exception
+     */
     @Override
     public void resumeSend() throws IOException {
-               synchronized (outBuffer) {
-                       if (!socket.isOpen()) {
-                               return;
-                       }
-
-               outBuffer.flip();
-               socket.write(outBuffer);
-               outBuffer.compact();
-               if (outBuffer.position() > 0) {
-                       this.clientSelectionKey = this.socket.register(
-                                       this.selector, SelectionKey.OP_WRITE, this);
-               } else {
-                       this.clientSelectionKey = this.socket.register(
-                                       this.selector, SelectionKey.OP_READ, this);
-               }
+        synchronized (outBuffer) {
+            if (!socket.isOpen()) {
+                return;
+            }
+
+            outBuffer.flip();
+            socket.write(outBuffer);
+            outBuffer.compact();
+            if (outBuffer.position() > 0) {
+                this.clientSelectionKey = this.socket.register(this.selector,
+                        SelectionKey.OP_WRITE, this);
+            } else {
+                this.clientSelectionKey = this.socket.register(this.selector,
+                        SelectionKey.OP_READ, this);
+            }
         }
     }
 
-       /**
-        * Reads the incoming network data from the socket and retrieves the OF
-        * messages.
-        
-        * @return list of OF messages
-        * @throws Exception
-        */
+    /**
+     * Reads the incoming network data from the socket and retrieves the OF
+     * messages.
+     * 
+     * @return list of OF messages
+     * @throws Exception
+     */
     @Override
     public List<OFMessage> readMessages() throws IOException {
-               if (!socket.isOpen()) {
-                       return null;
-               }
+        if (!socket.isOpen()) {
+            return null;
+        }
 
-               List<OFMessage> msgs = null;
-        int bytesRead = -1;        
+        List<OFMessage> msgs = null;
+        int bytesRead = -1;
         bytesRead = socket.read(inBuffer);
         if (bytesRead < 0) {
-                       throw new AsynchronousCloseException();
+            throw new AsynchronousCloseException();
         }
 
         inBuffer.flip();
@@ -142,9 +143,9 @@ public class MessageReadWriteService implements IMessageReadWrite {
         return msgs;
     }
 
-       @Override
-       public void stop() {
-               inBuffer = null;
-               outBuffer = null;
-       }
+    @Override
+    public void stop() {
+        inBuffer = null;
+        outBuffer = null;
+    }
 }