4 package org.openflow.io;
6 import java.io.IOException;
7 import java.nio.ByteBuffer;
8 import java.nio.channels.SocketChannel;
11 import org.openflow.protocol.OFMessage;
12 import org.openflow.protocol.factory.OFMessageFactory;
15 * Asynchronous OpenFlow message marshalling and unmarshalling stream wrapped
16 * around an NIO SocketChannel
18 * @author Rob Sherwood (rob.sherwood@stanford.edu)
19 * @author David Erickson (daviderickson@cs.stanford.edu)
22 public class OFMessageAsyncStream implements OFMessageInStream,
24 static public int defaultBufferSize = 1048576; // 1MB
26 protected ByteBuffer inBuf, outBuf;
27 protected OFMessageFactory messageFactory;
28 protected SocketChannel sock;
29 protected int partialReadCount = 0;
31 public OFMessageAsyncStream(SocketChannel sock,
32 OFMessageFactory messageFactory) throws IOException {
34 .allocateDirect(OFMessageAsyncStream.defaultBufferSize);
36 .allocateDirect(OFMessageAsyncStream.defaultBufferSize);
38 this.messageFactory = messageFactory;
39 this.sock.configureBlocking(false);
43 public List<OFMessage> read() throws IOException {
48 public List<OFMessage> read(int limit) throws IOException {
50 int read = sock.read(inBuf);
54 l = messageFactory.parseMessages(inBuf, limit);
55 if (inBuf.hasRemaining())
62 protected void appendMessageToOutBuf(OFMessage m) throws IOException {
63 int msglen = m.getLengthU();
64 if (outBuf.remaining() < msglen) {
65 throw new IOException(
66 "Message length exceeds buffer capacity: " + msglen);
72 * Buffers a single outgoing openflow message
75 public void write(OFMessage m) throws IOException {
76 appendMessageToOutBuf(m);
80 * Buffers a list of OpenFlow messages
83 public void write(List<OFMessage> l) throws IOException {
84 for (OFMessage m : l) {
85 appendMessageToOutBuf(m);
90 * Flush buffered outgoing data. Keep flushing until needsFlush() returns
91 * false. Each flush() corresponds to a SocketChannel.write(), so this is
92 * designed for one flush() per select() event
94 public void flush() throws IOException {
95 outBuf.flip(); // swap pointers; lim = pos; pos = 0;
96 sock.write(outBuf); // write data starting at pos up to lim
101 * Is there outgoing buffered data that needs to be flush()'d?
103 public boolean needsFlush() {
104 return outBuf.position() > 0;
108 * @return the messageFactory
110 public OFMessageFactory getMessageFactory() {
111 return messageFactory;
115 * @param messageFactory
116 * the messageFactory to set
118 public void setMessageFactory(OFMessageFactory messageFactory) {
119 this.messageFactory = messageFactory;