- Plugin sends Barrier msg every 100 async msgs (configurable thru config.ini: of...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / MessageReadWriteService.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.nio.ByteBuffer;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.ClosedChannelException;
15 import java.nio.channels.SelectionKey;
16 import java.nio.channels.Selector;
17 import java.nio.channels.SocketChannel;
18 import java.util.List;
19
20 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
21 import org.openflow.protocol.OFMessage;
22 import org.openflow.protocol.factory.BasicFactory;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * This class implements methods to read/write messages over an established
28  * socket channel. The data exchange is in clear text format.
29  */
30 public class MessageReadWriteService implements IMessageReadWrite {
31     private static final Logger logger = LoggerFactory
32             .getLogger(MessageReadWriteService.class);
33     private static final int bufferSize = 1024 * 1024;
34
35     private Selector selector;
36     private SelectionKey clientSelectionKey;
37     private SocketChannel socket;
38     private ByteBuffer inBuffer;
39     private ByteBuffer outBuffer;
40     private BasicFactory factory;
41
42     public MessageReadWriteService(SocketChannel socket, Selector selector)
43             throws ClosedChannelException {
44         this.socket = socket;
45         this.selector = selector;
46         this.factory = new BasicFactory();
47         this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
48         this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
49         this.clientSelectionKey = this.socket.register(this.selector,
50                 SelectionKey.OP_READ);
51     }
52
53     /**
54      * Sends the OF message out over the socket channel.
55      * 
56      * @param msg
57      *            OF message to be sent
58      * @throws Exception
59      */
60     @Override
61     public void asyncSend(OFMessage msg) throws IOException {
62         synchronized (outBuffer) {
63             int msgLen = msg.getLengthU();
64             if (outBuffer.remaining() < msgLen) {
65                 // increase the buffer size so that it can contain this message
66                 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
67                         .capacity() + msgLen);
68                 outBuffer.flip();
69                 newBuffer.put(outBuffer);
70                 outBuffer = newBuffer;
71             }
72         }
73         synchronized (outBuffer) {
74             msg.writeTo(outBuffer);
75
76             if (!socket.isOpen()) {
77                 return;
78             }
79
80             outBuffer.flip();
81             socket.write(outBuffer);
82             outBuffer.compact();
83             if (outBuffer.position() > 0) {
84                 this.clientSelectionKey = this.socket.register(this.selector,
85                         SelectionKey.OP_WRITE, this);
86             }
87             logger.trace("Message sent: {}", msg.toString());
88         }
89     }
90
91     /**
92      * Resumes sending the remaining messages in the outgoing buffer
93      * 
94      * @throws Exception
95      */
96     @Override
97     public void resumeSend() throws IOException {
98         synchronized (outBuffer) {
99             if (!socket.isOpen()) {
100                 return;
101             }
102
103             outBuffer.flip();
104             socket.write(outBuffer);
105             outBuffer.compact();
106             if (outBuffer.position() > 0) {
107                 this.clientSelectionKey = this.socket.register(this.selector,
108                         SelectionKey.OP_WRITE, this);
109             } else {
110                 this.clientSelectionKey = this.socket.register(this.selector,
111                         SelectionKey.OP_READ, this);
112             }
113         }
114     }
115
116     /**
117      * Reads the incoming network data from the socket and retrieves the OF
118      * messages.
119      * 
120      * @return list of OF messages
121      * @throws Exception
122      */
123     @Override
124     public List<OFMessage> readMessages() throws IOException {
125         if (!socket.isOpen()) {
126             return null;
127         }
128
129         List<OFMessage> msgs = null;
130         int bytesRead = -1;
131         bytesRead = socket.read(inBuffer);
132         if (bytesRead < 0) {
133             throw new AsynchronousCloseException();
134         }
135
136         inBuffer.flip();
137         msgs = factory.parseMessages(inBuffer);
138         if (inBuffer.hasRemaining()) {
139             inBuffer.compact();
140         } else {
141             inBuffer.clear();
142         }
143         return msgs;
144     }
145
146     @Override
147     public void stop() {
148         inBuffer = null;
149         outBuffer = null;
150     }
151 }