035ba6527c70290dc580d7e91620cdd9cdb9703c
[openflowjava.git] / third-party / openflow-codec / src / main / java / org / openflow / codec / io / OFMessageAsyncStream.java
1 /**
2  *
3  */
4 package org.openflow.codec.io;
5
6 import java.io.IOException;
7 import java.nio.ByteBuffer;
8 import java.nio.channels.SocketChannel;
9 import java.util.List;
10
11 import org.openflow.codec.protocol.OFPMessage;
12 import org.openflow.codec.protocol.factory.OFPMessageFactory;
13
14 /**
15  * Asynchronous OpenFlow message marshalling and unmarshalling stream wrapped
16  * around an NIO SocketChannel
17  *
18  * @author Rob Sherwood (rob.sherwood@stanford.edu)
19  * @author David Erickson (daviderickson@cs.stanford.edu)
20  *
21  */
22 public class OFMessageAsyncStream implements OFMessageInStream, OFMessageOutStream {
23     static public int defaultBufferSize = 1048576; // 1MB
24
25     protected ByteBuffer inBuf, outBuf;
26     protected OFPMessageFactory messageFactory;
27     protected SocketChannel sock;
28     protected int partialReadCount = 0;
29
30     public OFMessageAsyncStream(SocketChannel sock, OFPMessageFactory messageFactory) throws IOException {
31         inBuf = ByteBuffer.allocateDirect(OFMessageAsyncStream.defaultBufferSize);
32         outBuf = ByteBuffer.allocateDirect(OFMessageAsyncStream.defaultBufferSize);
33         this.sock = sock;
34         this.messageFactory = messageFactory;
35         this.sock.configureBlocking(false);
36     }
37
38     @Override
39     public List<OFPMessage> read() throws IOException {
40         return this.read(0);
41     }
42
43     @Override
44     public List<OFPMessage> read(int limit) throws IOException {
45         List<OFPMessage> l;
46         int read = sock.read(inBuf);
47         if (read == -1)
48             return null;
49         inBuf.flip();
50         IDataBuffer buffer = new ByteDataBuffer(inBuf);
51         l = messageFactory.parseMessages(buffer, limit);
52         if (inBuf.hasRemaining())
53             inBuf.compact();
54         else
55             inBuf.clear();
56         return l;
57     }
58
59     protected void appendMessageToOutBuf(OFPMessage m) throws IOException {
60         int msglen = m.getLengthU();
61         if (outBuf.remaining() < msglen) {
62             throw new IOException("Message length exceeds buffer capacity: " + msglen);
63         }
64         IDataBuffer buffer = new ByteDataBuffer(outBuf);
65         m.writeTo(buffer);
66     }
67
68     /**
69      * Buffers a single outgoing openflow message
70      */
71     @Override
72     public void write(OFPMessage m) throws IOException {
73         appendMessageToOutBuf(m);
74     }
75
76     /**
77      * Buffers a list of OpenFlow messages
78      */
79     @Override
80     public void write(List<OFPMessage> l) throws IOException {
81         for (OFPMessage m : l) {
82             appendMessageToOutBuf(m);
83         }
84     }
85
86     /**
87      * Flush buffered outgoing data. Keep flushing until needsFlush() returns
88      * false. Each flush() corresponds to a SocketChannel.write(), so this is
89      * designed for one flush() per select() event
90      */
91     public void flush() throws IOException {
92         outBuf.flip(); // swap pointers; lim = pos; pos = 0;
93         sock.write(outBuf); // write data starting at pos up to lim
94         outBuf.compact();
95     }
96
97     /**
98      * Is there outgoing buffered data that needs to be flush()'d?
99      */
100     public boolean needsFlush() {
101         return outBuf.position() > 0;
102     }
103
104     /**
105      * @return the messageFactory
106      */
107     public OFPMessageFactory getMessageFactory() {
108         return messageFactory;
109     }
110
111     /**
112      * @param messageFactory
113      *            the messageFactory to set
114      */
115     public void setMessageFactory(OFPMessageFactory messageFactory) {
116         this.messageFactory = messageFactory;
117     }
118 }