3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.channels.AsynchronousCloseException;
15 import java.nio.channels.ClosedChannelException;
16 import java.nio.channels.SelectionKey;
17 import java.nio.channels.Selector;
18 import java.nio.channels.SocketChannel;
19 import java.util.List;
21 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
22 import org.openflow.protocol.OFMessage;
23 import org.openflow.protocol.factory.BasicFactory;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * This class implements methods to read/write messages over an established
29 * socket channel. The data exchange is in clear text format.
31 public class MessageReadWriteService implements IMessageReadWrite {
32 private static final Logger logger = LoggerFactory
33 .getLogger(MessageReadWriteService.class);
34 private static final int bufferSize = 1024 * 1024;
36 private Selector selector;
37 private SelectionKey clientSelectionKey;
38 private SocketChannel socket;
39 private ByteBuffer inBuffer;
40 private ByteBuffer outBuffer;
41 private BasicFactory factory;
43 public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
45 this.selector = selector;
46 this.factory = new BasicFactory();
47 this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
48 this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
49 this.clientSelectionKey = this.socket.register(this.selector,
50 SelectionKey.OP_READ);
54 * Sends the OF message out over the socket channel.
56 * @param msg OF message to be sent
60 public void asyncSend(OFMessage msg) throws IOException {
61 synchronized (outBuffer) {
62 int msgLen = msg.getLengthU();
63 if (outBuffer.remaining() < msgLen) {
64 // increase the buffer size so that it can contain this message
65 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
69 newBuffer.put(outBuffer);
70 outBuffer = newBuffer;
73 synchronized (outBuffer) {
74 msg.writeTo(outBuffer);
76 if (!socket.isOpen()) {
81 socket.write(outBuffer);
83 if (outBuffer.position() > 0) {
84 this.clientSelectionKey = this.socket.register(
85 this.selector, SelectionKey.OP_WRITE, this);
87 logger.trace("Message sent: {}", msg.toString());
92 * Resumes sending the remaining messages in the outgoing buffer
96 public void resumeSend() throws IOException {
97 synchronized (outBuffer) {
98 if (!socket.isOpen()) {
103 socket.write(outBuffer);
105 if (outBuffer.position() > 0) {
106 this.clientSelectionKey = this.socket.register(
107 this.selector, SelectionKey.OP_WRITE, this);
109 this.clientSelectionKey = this.socket.register(
110 this.selector, SelectionKey.OP_READ, this);
116 * Reads the incoming network data from the socket and retrieves the OF
119 * @return list of OF messages
123 public List<OFMessage> readMessages() throws IOException {
124 if (!socket.isOpen()) {
128 List<OFMessage> msgs = null;
130 bytesRead = socket.read(inBuffer);
132 throw new AsynchronousCloseException();
136 msgs = factory.parseMessages(inBuffer);
137 if (inBuffer.hasRemaining()) {