2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11 import java.io.IOException;
12 import java.nio.ByteBuffer;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.ClosedChannelException;
15 import java.nio.channels.SelectionKey;
16 import java.nio.channels.Selector;
17 import java.nio.channels.SocketChannel;
18 import java.util.List;
20 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
21 import org.openflow.protocol.OFMessage;
22 import org.openflow.protocol.factory.BasicFactory;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * This class implements methods to read/write messages over an established
28 * socket channel. The data exchange is in clear text format.
30 public class MessageReadWriteService implements IMessageReadWrite {
31 private static final Logger logger = LoggerFactory
32 .getLogger(MessageReadWriteService.class);
33 private static final int bufferSize = 1024 * 1024;
35 private Selector selector;
36 private SelectionKey clientSelectionKey;
37 private SocketChannel socket;
38 private ByteBuffer inBuffer;
39 private ByteBuffer outBuffer;
40 private BasicFactory factory;
42 public MessageReadWriteService(SocketChannel socket, Selector selector)
43 throws ClosedChannelException {
45 this.selector = selector;
46 this.factory = new BasicFactory();
47 this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
48 this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
49 this.clientSelectionKey = this.socket.register(this.selector,
50 SelectionKey.OP_READ);
54 * Sends the OF message out over the socket channel.
57 * OF message to be sent
61 public void asyncSend(OFMessage msg) throws IOException {
62 synchronized (outBuffer) {
63 int msgLen = msg.getLengthU();
64 if (outBuffer.remaining() < msgLen) {
65 // increase the buffer size so that it can contain this message
66 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
67 .capacity() + msgLen);
69 newBuffer.put(outBuffer);
70 outBuffer = newBuffer;
73 synchronized (outBuffer) {
74 msg.writeTo(outBuffer);
76 if (!socket.isOpen()) {
81 socket.write(outBuffer);
83 if (outBuffer.position() > 0) {
84 this.clientSelectionKey = this.socket.register(this.selector,
85 SelectionKey.OP_WRITE, this);
87 logger.trace("Message sent: {}", msg.toString());
92 * Resumes sending the remaining messages in the outgoing buffer
97 public void resumeSend() throws IOException {
98 synchronized (outBuffer) {
99 if (!socket.isOpen()) {
104 socket.write(outBuffer);
106 if (outBuffer.position() > 0) {
107 this.clientSelectionKey = this.socket.register(this.selector,
108 SelectionKey.OP_WRITE, this);
110 this.clientSelectionKey = this.socket.register(this.selector,
111 SelectionKey.OP_READ, this);
117 * Reads the incoming network data from the socket and retrieves the OF
120 * @return list of OF messages
124 public List<OFMessage> readMessages() throws IOException {
125 if (!socket.isOpen()) {
129 List<OFMessage> msgs = null;
131 bytesRead = socket.read(inBuffer);
133 throw new AsynchronousCloseException();
137 msgs = factory.parseMessages(inBuffer);
138 if (inBuffer.hasRemaining()) {