X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FMessageReadWriteService.java;h=9041004605dded75ce2fe44f129c617656ab6e60;hb=6fd408a04fe4a3611843e2246ece6d7c34b76903;hp=8e611924e41ae8b6ce5951397e002f3b5f16cf81;hpb=ab59b3b28b16f1a87d07b37cdc6dbe882d255bb5;p=controller.git diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java index 8e611924e4..9041004605 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -9,7 +8,6 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ClosedChannelException; @@ -40,111 +38,118 @@ public class MessageReadWriteService implements IMessageReadWrite { private ByteBuffer outBuffer; private BasicFactory factory; - public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException { - this.socket = socket; - this.selector = selector; - this.factory = new BasicFactory(); - this.inBuffer = ByteBuffer.allocateDirect(bufferSize); - this.outBuffer = ByteBuffer.allocateDirect(bufferSize); - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_READ); + public MessageReadWriteService(SocketChannel socket, Selector selector) + throws ClosedChannelException { + this.socket = socket; + this.selector = selector; + this.factory = new BasicFactory(); + this.inBuffer = ByteBuffer.allocateDirect(bufferSize); + this.outBuffer = ByteBuffer.allocateDirect(bufferSize); + this.clientSelectionKey = this.socket.register(this.selector, + SelectionKey.OP_READ); } - /** - * Sends the OF message out over the socket channel. - * - * @param msg OF message to be sent - * @throws Exception - */ + /** + * Sends the OF message out over the socket channel. + * + * @param msg + * OF message to be sent + * @throws Exception + */ @Override - public void asyncSend(OFMessage msg) throws IOException { - synchronized (outBuffer) { - int msgLen = msg.getLengthU(); - if (outBuffer.remaining() < msgLen) { - // increase the buffer size so that it can contain this message - ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer - .capacity() - + msgLen); - outBuffer.flip(); - newBuffer.put(outBuffer); - outBuffer = newBuffer; - } - } - synchronized (outBuffer) { - msg.writeTo(outBuffer); - - if (!socket.isOpen()) { - return; - } - - outBuffer.flip(); - socket.write(outBuffer); - outBuffer.compact(); - if (outBuffer.position() > 0) { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_WRITE, this); - } - logger.trace("Message sent: {}", msg.toString()); - } + public void asyncSend(OFMessage msg) throws Exception { + synchronized (outBuffer) { + int msgLen = msg.getLengthU(); + if (outBuffer.remaining() < msgLen) { + // increase the buffer size so that it can contain this message + ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer + .capacity() + msgLen); + outBuffer.flip(); + newBuffer.put(outBuffer); + outBuffer = newBuffer; + } + } + synchronized (outBuffer) { + msg.writeTo(outBuffer); + + if (!socket.isOpen()) { + return; + } + + outBuffer.flip(); + socket.write(outBuffer); + outBuffer.compact(); + if (outBuffer.position() > 0) { + this.clientSelectionKey = this.socket.register(this.selector, + SelectionKey.OP_WRITE, this); + } + logger.trace("Message sent: {}", msg); + } } - /** - * Resumes sending the remaining messages in the outgoing buffer - * @throws Exception - */ + /** + * Resumes sending the remaining messages in the outgoing buffer + * + * @throws Exception + */ @Override - public void resumeSend() throws IOException { - synchronized (outBuffer) { - if (!socket.isOpen()) { - return; - } - - outBuffer.flip(); - socket.write(outBuffer); - outBuffer.compact(); - if (outBuffer.position() > 0) { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_WRITE, this); - } else { - this.clientSelectionKey = this.socket.register( - this.selector, SelectionKey.OP_READ, this); - } + public void resumeSend() throws Exception { + synchronized (outBuffer) { + if (!socket.isOpen()) { + return; + } + + outBuffer.flip(); + socket.write(outBuffer); + outBuffer.compact(); + if (outBuffer.position() > 0) { + this.clientSelectionKey = this.socket.register(this.selector, + SelectionKey.OP_WRITE, this); + } else { + this.clientSelectionKey = this.socket.register(this.selector, + SelectionKey.OP_READ, this); + } } } - /** - * Reads the incoming network data from the socket and retrieves the OF - * messages. - * - * @return list of OF messages - * @throws Exception - */ + /** + * Reads the incoming network data from the socket and retrieves the OF + * messages. + * + * @return list of OF messages + * @throws Exception + */ @Override - public List readMessages() throws IOException { - if (!socket.isOpen()) { - return null; - } + public List readMessages() throws Exception { + if (!socket.isOpen()) { + return null; + } - List msgs = null; - int bytesRead = -1; + List msgs = null; + int bytesRead = -1; bytesRead = socket.read(inBuffer); if (bytesRead < 0) { - throw new AsynchronousCloseException(); + throw new AsynchronousCloseException(); } - inBuffer.flip(); - msgs = factory.parseMessages(inBuffer); - if (inBuffer.hasRemaining()) { - inBuffer.compact(); - } else { + try { + inBuffer.flip(); + msgs = factory.parseMessages(inBuffer); + if (inBuffer.hasRemaining()) { + inBuffer.compact(); + } else { + inBuffer.clear(); + } + } catch (Exception e) { inBuffer.clear(); + logger.debug("Caught exception: ", e); } return msgs; } - @Override - public void stop() { - inBuffer = null; - outBuffer = null; - } + @Override + public void stop() { + inBuffer = null; + outBuffer = null; + } }