-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
private ByteBuffer outBuffer;
private BasicFactory factory;
- public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
- this.socket = socket;
- this.selector = selector;
- this.factory = new BasicFactory();
- this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
- this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ);
+ public MessageReadWriteService(SocketChannel socket, Selector selector)
+ throws ClosedChannelException {
+ this.socket = socket;
+ this.selector = selector;
+ this.factory = new BasicFactory();
+ this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
+ this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
+ this.clientSelectionKey = this.socket.register(this.selector,
+ SelectionKey.OP_READ);
}
- /**
- * Sends the OF message out over the socket channel.
- *
- * @param msg OF message to be sent
- * @throws Exception
- */
+ /**
+ * Sends the OF message out over the socket channel.
+ *
+ * @param msg
+ * OF message to be sent
+ * @throws Exception
+ */
@Override
public void asyncSend(OFMessage msg) throws IOException {
- synchronized (outBuffer) {
- int msgLen = msg.getLengthU();
- if (outBuffer.remaining() < msgLen) {
- // increase the buffer size so that it can contain this message
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
- .capacity()
- + msgLen);
- outBuffer.flip();
- newBuffer.put(outBuffer);
- outBuffer = newBuffer;
- }
- }
- synchronized (outBuffer) {
- msg.writeTo(outBuffer);
-
- if (!socket.isOpen()) {
- return;
- }
-
- outBuffer.flip();
- socket.write(outBuffer);
- outBuffer.compact();
- if (outBuffer.position() > 0) {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_WRITE, this);
- }
- logger.trace("Message sent: {}", msg.toString());
- }
+ synchronized (outBuffer) {
+ int msgLen = msg.getLengthU();
+ if (outBuffer.remaining() < msgLen) {
+ // increase the buffer size so that it can contain this message
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
+ .capacity() + msgLen);
+ outBuffer.flip();
+ newBuffer.put(outBuffer);
+ outBuffer = newBuffer;
+ }
+ }
+ synchronized (outBuffer) {
+ msg.writeTo(outBuffer);
+
+ if (!socket.isOpen()) {
+ return;
+ }
+
+ outBuffer.flip();
+ socket.write(outBuffer);
+ outBuffer.compact();
+ if (outBuffer.position() > 0) {
+ this.clientSelectionKey = this.socket.register(this.selector,
+ SelectionKey.OP_WRITE, this);
+ }
+ logger.trace("Message sent: {}", msg.toString());
+ }
}
- /**
- * Resumes sending the remaining messages in the outgoing buffer
- * @throws Exception
- */
+ /**
+ * Resumes sending the remaining messages in the outgoing buffer
+ *
+ * @throws Exception
+ */
@Override
public void resumeSend() throws IOException {
- synchronized (outBuffer) {
- if (!socket.isOpen()) {
- return;
- }
-
- outBuffer.flip();
- socket.write(outBuffer);
- outBuffer.compact();
- if (outBuffer.position() > 0) {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_WRITE, this);
- } else {
- this.clientSelectionKey = this.socket.register(
- this.selector, SelectionKey.OP_READ, this);
- }
+ synchronized (outBuffer) {
+ if (!socket.isOpen()) {
+ return;
+ }
+
+ outBuffer.flip();
+ socket.write(outBuffer);
+ outBuffer.compact();
+ if (outBuffer.position() > 0) {
+ this.clientSelectionKey = this.socket.register(this.selector,
+ SelectionKey.OP_WRITE, this);
+ } else {
+ this.clientSelectionKey = this.socket.register(this.selector,
+ SelectionKey.OP_READ, this);
+ }
}
}
- /**
- * Reads the incoming network data from the socket and retrieves the OF
- * messages.
- *
- * @return list of OF messages
- * @throws Exception
- */
+ /**
+ * Reads the incoming network data from the socket and retrieves the OF
+ * messages.
+ *
+ * @return list of OF messages
+ * @throws Exception
+ */
@Override
public List<OFMessage> readMessages() throws IOException {
- if (!socket.isOpen()) {
- return null;
- }
+ if (!socket.isOpen()) {
+ return null;
+ }
- List<OFMessage> msgs = null;
- int bytesRead = -1;
+ List<OFMessage> msgs = null;
+ int bytesRead = -1;
bytesRead = socket.read(inBuffer);
if (bytesRead < 0) {
- throw new AsynchronousCloseException();
+ throw new AsynchronousCloseException();
}
inBuffer.flip();
return msgs;
}
- @Override
- public void stop() {
- inBuffer = null;
- outBuffer = null;
- }
+ @Override
+ public void stop() {
+ inBuffer = null;
+ outBuffer = null;
+ }
}