8e611924e41ae8b6ce5951397e002f3b5f16cf81
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / MessageReadWriteService.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.channels.AsynchronousCloseException;
15 import java.nio.channels.ClosedChannelException;
16 import java.nio.channels.SelectionKey;
17 import java.nio.channels.Selector;
18 import java.nio.channels.SocketChannel;
19 import java.util.List;
20
21 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
22 import org.openflow.protocol.OFMessage;
23 import org.openflow.protocol.factory.BasicFactory;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * This class implements methods to read/write messages over an established
29  * socket channel. The data exchange is in clear text format.
30  */
31 public class MessageReadWriteService implements IMessageReadWrite {
32     private static final Logger logger = LoggerFactory
33             .getLogger(MessageReadWriteService.class);
34     private static final int bufferSize = 1024 * 1024;
35
36     private Selector selector;
37     private SelectionKey clientSelectionKey;
38     private SocketChannel socket;
39     private ByteBuffer inBuffer;
40     private ByteBuffer outBuffer;
41     private BasicFactory factory;
42
43     public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
44         this.socket = socket;
45         this.selector = selector;
46         this.factory = new BasicFactory();
47         this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
48         this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
49         this.clientSelectionKey = this.socket.register(this.selector,
50                         SelectionKey.OP_READ);
51     }
52
53         /**
54          * Sends the OF message out over the socket channel.
55          * 
56          * @param msg OF message to be sent
57          * @throws Exception
58          */
59     @Override
60     public void asyncSend(OFMessage msg) throws IOException {
61         synchronized (outBuffer) {
62                 int msgLen = msg.getLengthU();
63                 if (outBuffer.remaining() < msgLen) {
64                         // increase the buffer size so that it can contain this message
65                         ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
66                                         .capacity()
67                                         + msgLen);
68                         outBuffer.flip();
69                         newBuffer.put(outBuffer);
70                         outBuffer = newBuffer;
71                 }
72         }
73         synchronized (outBuffer) {
74                 msg.writeTo(outBuffer);
75
76                 if (!socket.isOpen()) {
77                         return;
78                 }
79
80                 outBuffer.flip();
81                 socket.write(outBuffer);
82                 outBuffer.compact();
83                 if (outBuffer.position() > 0) {
84                         this.clientSelectionKey = this.socket.register(
85                                         this.selector, SelectionKey.OP_WRITE, this);
86                 }
87                 logger.trace("Message sent: {}", msg.toString());
88         }
89     }
90
91         /**
92          * Resumes sending the remaining messages in the outgoing buffer
93          * @throws Exception
94          */
95     @Override
96     public void resumeSend() throws IOException {
97                 synchronized (outBuffer) {
98                         if (!socket.isOpen()) {
99                                 return;
100                         }
101
102                 outBuffer.flip();
103                 socket.write(outBuffer);
104                 outBuffer.compact();
105                 if (outBuffer.position() > 0) {
106                         this.clientSelectionKey = this.socket.register(
107                                         this.selector, SelectionKey.OP_WRITE, this);
108                 } else {
109                         this.clientSelectionKey = this.socket.register(
110                                         this.selector, SelectionKey.OP_READ, this);
111                 }
112         }
113     }
114
115         /**
116          * Reads the incoming network data from the socket and retrieves the OF
117          * messages.
118          * 
119          * @return list of OF messages
120          * @throws Exception
121          */
122     @Override
123     public List<OFMessage> readMessages() throws IOException {
124                 if (!socket.isOpen()) {
125                         return null;
126                 }
127
128                 List<OFMessage> msgs = null;
129         int bytesRead = -1;        
130         bytesRead = socket.read(inBuffer);
131         if (bytesRead < 0) {
132                         throw new AsynchronousCloseException();
133         }
134
135         inBuffer.flip();
136         msgs = factory.parseMessages(inBuffer);
137         if (inBuffer.hasRemaining()) {
138             inBuffer.compact();
139         } else {
140             inBuffer.clear();
141         }
142         return msgs;
143     }
144
145         @Override
146         public void stop() {
147                 inBuffer = null;
148                 outBuffer = null;
149         }
150 }