3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.channels.AsynchronousCloseException;
15 import java.nio.channels.ClosedChannelException;
16 import java.nio.channels.SelectionKey;
17 import java.nio.channels.Selector;
18 import java.nio.channels.SocketChannel;
19 import java.util.List;
21 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
22 import org.openflow.protocol.OFMessage;
23 import org.openflow.protocol.factory.BasicFactory;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * This class implements methods to read/write messages over an established
29 * socket channel. The data exchange is in clear text format.
31 public class MessageReadWriteService implements IMessageReadWrite {
32 private static final Logger logger = LoggerFactory
33 .getLogger(MessageReadWriteService.class);
34 private static final int bufferSize = 1024 * 1024;
36 private Selector selector;
37 private SelectionKey clientSelectionKey;
38 private SocketChannel socket;
39 private ByteBuffer inBuffer;
40 private ByteBuffer outBuffer;
41 private BasicFactory factory;
43 public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
45 this.selector = selector;
46 this.factory = new BasicFactory();
47 this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
48 this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
49 this.clientSelectionKey = this.socket.register(this.selector,
50 SelectionKey.OP_READ);
54 * Sends the OF message out over the socket channel.
56 * @param msg OF message to be sent
60 public void asyncSend(OFMessage msg) throws IOException {
61 synchronized (outBuffer) {
62 int msgLen = msg.getLengthU();
63 if (outBuffer.remaining() < msgLen) {
64 // increase the buffer size so that it can contain this message
65 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
69 newBuffer.put(outBuffer);
70 outBuffer = newBuffer;
72 msg.writeTo(outBuffer);
74 if (!socket.isOpen()) {
79 socket.write(outBuffer);
81 if (outBuffer.position() > 0) {
82 this.clientSelectionKey = this.socket.register(
83 this.selector, SelectionKey.OP_WRITE, this);
85 logger.trace("Message sent: {}", msg.toString());
90 * Resumes sending the remaining messages in the outgoing buffer
94 public void resumeSend() throws IOException {
95 synchronized (outBuffer) {
96 if (!socket.isOpen()) {
101 socket.write(outBuffer);
103 if (outBuffer.position() > 0) {
104 this.clientSelectionKey = this.socket.register(
105 this.selector, SelectionKey.OP_WRITE, this);
107 this.clientSelectionKey = this.socket.register(
108 this.selector, SelectionKey.OP_READ, this);
114 * Reads the incoming network data from the socket and retrieves the OF
117 * @return list of OF messages
121 public List<OFMessage> readMessages() throws IOException {
122 if (!socket.isOpen()) {
126 List<OFMessage> msgs = null;
128 bytesRead = socket.read(inBuffer);
130 throw new AsynchronousCloseException();
134 msgs = factory.parseMessages(inBuffer);
135 if (inBuffer.hasRemaining()) {