4 package org.openflow.codec.io;
6 import java.io.IOException;
7 import java.nio.ByteBuffer;
8 import java.nio.channels.SocketChannel;
11 import org.openflow.codec.protocol.OFPMessage;
12 import org.openflow.codec.protocol.factory.OFPMessageFactory;
15 * Asynchronous OpenFlow message marshalling and unmarshalling stream wrapped
16 * around an NIO SocketChannel
18 * @author Rob Sherwood (rob.sherwood@stanford.edu)
19 * @author David Erickson (daviderickson@cs.stanford.edu)
22 public class OFMessageAsyncStream implements OFMessageInStream, OFMessageOutStream {
23 static public int defaultBufferSize = 1048576; // 1MB
25 protected ByteBuffer inBuf, outBuf;
26 protected OFPMessageFactory messageFactory;
27 protected SocketChannel sock;
28 protected int partialReadCount = 0;
30 public OFMessageAsyncStream(SocketChannel sock, OFPMessageFactory messageFactory) throws IOException {
31 inBuf = ByteBuffer.allocateDirect(OFMessageAsyncStream.defaultBufferSize);
32 outBuf = ByteBuffer.allocateDirect(OFMessageAsyncStream.defaultBufferSize);
34 this.messageFactory = messageFactory;
35 this.sock.configureBlocking(false);
39 public List<OFPMessage> read() throws IOException {
44 public List<OFPMessage> read(int limit) throws IOException {
46 int read = sock.read(inBuf);
50 IDataBuffer buffer = new ByteDataBuffer(inBuf);
51 l = messageFactory.parseMessages(buffer, limit);
52 if (inBuf.hasRemaining())
59 protected void appendMessageToOutBuf(OFPMessage m) throws IOException {
60 int msglen = m.getLengthU();
61 if (outBuf.remaining() < msglen) {
62 throw new IOException("Message length exceeds buffer capacity: " + msglen);
64 IDataBuffer buffer = new ByteDataBuffer(outBuf);
69 * Buffers a single outgoing openflow message
72 public void write(OFPMessage m) throws IOException {
73 appendMessageToOutBuf(m);
77 * Buffers a list of OpenFlow messages
80 public void write(List<OFPMessage> l) throws IOException {
81 for (OFPMessage m : l) {
82 appendMessageToOutBuf(m);
87 * Flush buffered outgoing data. Keep flushing until needsFlush() returns
88 * false. Each flush() corresponds to a SocketChannel.write(), so this is
89 * designed for one flush() per select() event
91 public void flush() throws IOException {
92 outBuf.flip(); // swap pointers; lim = pos; pos = 0;
93 sock.write(outBuf); // write data starting at pos up to lim
98 * Is there outgoing buffered data that needs to be flush()'d?
100 public boolean needsFlush() {
101 return outBuf.position() > 0;
105 * @return the messageFactory
107 public OFPMessageFactory getMessageFactory() {
108 return messageFactory;
112 * @param messageFactory
113 * the messageFactory to set
115 public void setMessageFactory(OFPMessageFactory messageFactory) {
116 this.messageFactory = messageFactory;