Merge "Increase timeout for waiting for broker service in sal-binding-it."
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / MessageReadWriteService.java
index 8e611924e41ae8b6ce5951397e002f3b5f16cf81..9041004605dded75ce2fe44f129c617656ab6e60 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -9,7 +8,6 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedChannelException;
@@ -40,111 +38,118 @@ public class MessageReadWriteService implements IMessageReadWrite {
     private ByteBuffer outBuffer;
     private BasicFactory factory;
 
-    public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
-       this.socket = socket;
-       this.selector = selector;
-       this.factory = new BasicFactory();
-       this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
-       this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
-       this.clientSelectionKey = this.socket.register(this.selector,
-                       SelectionKey.OP_READ);
+    public MessageReadWriteService(SocketChannel socket, Selector selector)
+            throws ClosedChannelException {
+        this.socket = socket;
+        this.selector = selector;
+        this.factory = new BasicFactory();
+        this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
+        this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
+        this.clientSelectionKey = this.socket.register(this.selector,
+                SelectionKey.OP_READ);
     }
 
-       /**
-        * Sends the OF message out over the socket channel.
-        * 
-        * @param msg OF message to be sent
-        * @throws Exception
-        */
+    /**
+     * Sends the OF message out over the socket channel.
+     *
+     * @param msg
+     *            OF message to be sent
+     * @throws Exception
+     */
     @Override
-    public void asyncSend(OFMessage msg) throws IOException {
-       synchronized (outBuffer) {
-               int msgLen = msg.getLengthU();
-               if (outBuffer.remaining() < msgLen) {
-                       // increase the buffer size so that it can contain this message
-                       ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
-                                       .capacity()
-                                       + msgLen);
-                       outBuffer.flip();
-                       newBuffer.put(outBuffer);
-                       outBuffer = newBuffer;
-               }
-       }
-       synchronized (outBuffer) {
-               msg.writeTo(outBuffer);
-
-               if (!socket.isOpen()) {
-                       return;
-               }
-
-               outBuffer.flip();
-               socket.write(outBuffer);
-               outBuffer.compact();
-               if (outBuffer.position() > 0) {
-                       this.clientSelectionKey = this.socket.register(
-                                       this.selector, SelectionKey.OP_WRITE, this);
-               }
-               logger.trace("Message sent: {}", msg.toString());
-       }
+    public void asyncSend(OFMessage msg) throws Exception {
+        synchronized (outBuffer) {
+            int msgLen = msg.getLengthU();
+            if (outBuffer.remaining() < msgLen) {
+                // increase the buffer size so that it can contain this message
+                ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
+                        .capacity() + msgLen);
+                outBuffer.flip();
+                newBuffer.put(outBuffer);
+                outBuffer = newBuffer;
+            }
+        }
+        synchronized (outBuffer) {
+            msg.writeTo(outBuffer);
+
+            if (!socket.isOpen()) {
+                return;
+            }
+
+            outBuffer.flip();
+            socket.write(outBuffer);
+            outBuffer.compact();
+            if (outBuffer.position() > 0) {
+                this.clientSelectionKey = this.socket.register(this.selector,
+                        SelectionKey.OP_WRITE, this);
+            }
+            logger.trace("Message sent: {}", msg);
+        }
     }
 
-       /**
-        * Resumes sending the remaining messages in the outgoing buffer
-        * @throws Exception
-        */
+    /**
+     * Resumes sending the remaining messages in the outgoing buffer
+     *
+     * @throws Exception
+     */
     @Override
-    public void resumeSend() throws IOException {
-               synchronized (outBuffer) {
-                       if (!socket.isOpen()) {
-                               return;
-                       }
-
-               outBuffer.flip();
-               socket.write(outBuffer);
-               outBuffer.compact();
-               if (outBuffer.position() > 0) {
-                       this.clientSelectionKey = this.socket.register(
-                                       this.selector, SelectionKey.OP_WRITE, this);
-               } else {
-                       this.clientSelectionKey = this.socket.register(
-                                       this.selector, SelectionKey.OP_READ, this);
-               }
+    public void resumeSend() throws Exception {
+        synchronized (outBuffer) {
+            if (!socket.isOpen()) {
+                return;
+            }
+
+            outBuffer.flip();
+            socket.write(outBuffer);
+            outBuffer.compact();
+            if (outBuffer.position() > 0) {
+                this.clientSelectionKey = this.socket.register(this.selector,
+                        SelectionKey.OP_WRITE, this);
+            } else {
+                this.clientSelectionKey = this.socket.register(this.selector,
+                        SelectionKey.OP_READ, this);
+            }
         }
     }
 
-       /**
-        * Reads the incoming network data from the socket and retrieves the OF
-        * messages.
-        * 
-        * @return list of OF messages
-        * @throws Exception
-        */
+    /**
+     * Reads the incoming network data from the socket and retrieves the OF
+     * messages.
+     *
+     * @return list of OF messages
+     * @throws Exception
+     */
     @Override
-    public List<OFMessage> readMessages() throws IOException {
-               if (!socket.isOpen()) {
-                       return null;
-               }
+    public List<OFMessage> readMessages() throws Exception {
+        if (!socket.isOpen()) {
+            return null;
+        }
 
-               List<OFMessage> msgs = null;
-        int bytesRead = -1;        
+        List<OFMessage> msgs = null;
+        int bytesRead = -1;
         bytesRead = socket.read(inBuffer);
         if (bytesRead < 0) {
-                       throw new AsynchronousCloseException();
+            throw new AsynchronousCloseException();
         }
 
-        inBuffer.flip();
-        msgs = factory.parseMessages(inBuffer);
-        if (inBuffer.hasRemaining()) {
-            inBuffer.compact();
-        } else {
+        try {
+            inBuffer.flip();
+            msgs = factory.parseMessages(inBuffer);
+            if (inBuffer.hasRemaining()) {
+                inBuffer.compact();
+            } else {
+                inBuffer.clear();
+            }
+        } catch (Exception e) {
             inBuffer.clear();
+            logger.debug("Caught exception: ", e);
         }
         return msgs;
     }
 
-       @Override
-       public void stop() {
-               inBuffer = null;
-               outBuffer = null;
-       }
+    @Override
+    public void stop() {
+        inBuffer = null;
+        outBuffer = null;
+    }
 }