2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11 import java.nio.ByteBuffer;
12 import java.nio.channels.AsynchronousCloseException;
13 import java.nio.channels.ClosedChannelException;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.util.List;
19 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
20 import org.openflow.protocol.OFMessage;
21 import org.openflow.protocol.factory.BasicFactory;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
26 * This class implements methods to read/write messages over an established
27 * socket channel. The data exchange is in clear text format.
29 public class MessageReadWriteService implements IMessageReadWrite {
30 private static final Logger logger = LoggerFactory
31 .getLogger(MessageReadWriteService.class);
32 private static final int bufferSize = 1024 * 1024;
34 private Selector selector;
35 private SelectionKey clientSelectionKey;
36 private SocketChannel socket;
37 private ByteBuffer inBuffer;
38 private ByteBuffer outBuffer;
39 private BasicFactory factory;
41 public MessageReadWriteService(SocketChannel socket, Selector selector)
42 throws ClosedChannelException {
44 this.selector = selector;
45 this.factory = new BasicFactory();
46 this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
47 this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
48 this.clientSelectionKey = this.socket.register(this.selector,
49 SelectionKey.OP_READ);
53 * Sends the OF message out over the socket channel.
56 * OF message to be sent
60 public void asyncSend(OFMessage msg) throws Exception {
61 synchronized (outBuffer) {
62 int msgLen = msg.getLengthU();
63 if (outBuffer.remaining() < msgLen) {
64 // increase the buffer size so that it can contain this message
65 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
66 .capacity() + msgLen);
68 newBuffer.put(outBuffer);
69 outBuffer = newBuffer;
72 synchronized (outBuffer) {
73 msg.writeTo(outBuffer);
75 if (!socket.isOpen()) {
80 socket.write(outBuffer);
82 if (outBuffer.position() > 0) {
83 this.clientSelectionKey = this.socket.register(this.selector,
84 SelectionKey.OP_WRITE, this);
86 logger.trace("Message sent: {}", msg);
91 * Resumes sending the remaining messages in the outgoing buffer
96 public void resumeSend() throws Exception {
97 synchronized (outBuffer) {
98 if (!socket.isOpen()) {
103 socket.write(outBuffer);
105 if (outBuffer.position() > 0) {
106 this.clientSelectionKey = this.socket.register(this.selector,
107 SelectionKey.OP_WRITE, this);
109 this.clientSelectionKey = this.socket.register(this.selector,
110 SelectionKey.OP_READ, this);
116 * Reads the incoming network data from the socket and retrieves the OF
119 * @return list of OF messages
123 public List<OFMessage> readMessages() throws Exception {
124 if (!socket.isOpen()) {
128 List<OFMessage> msgs = null;
130 bytesRead = socket.read(inBuffer);
132 throw new AsynchronousCloseException();
137 msgs = factory.parseMessages(inBuffer);
138 if (inBuffer.hasRemaining()) {
143 } catch (Exception e) {
145 logger.debug("Caught exception: ", e);