Add TLS support in the Opendaylight Controller:
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / MessageReadWriteService.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.channels.AsynchronousCloseException;
15 import java.nio.channels.ClosedChannelException;
16 import java.nio.channels.SelectionKey;
17 import java.nio.channels.Selector;
18 import java.nio.channels.SocketChannel;
19 import java.util.List;
20
21 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
22 import org.openflow.protocol.OFMessage;
23 import org.openflow.protocol.factory.BasicFactory;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * This class implements methods to read/write messages over an established
29  * socket channel. The data exchange is in clear text format.
30  */
31 public class MessageReadWriteService implements IMessageReadWrite {
32     private static final Logger logger = LoggerFactory
33             .getLogger(MessageReadWriteService.class);
34     private static final int bufferSize = 1024 * 1024;
35
36     private Selector selector;
37     private SelectionKey clientSelectionKey;
38     private SocketChannel socket;
39     private ByteBuffer inBuffer;
40     private ByteBuffer outBuffer;
41     private BasicFactory factory;
42
43     public MessageReadWriteService(SocketChannel socket, Selector selector) throws ClosedChannelException {
44         this.socket = socket;
45         this.selector = selector;
46         this.factory = new BasicFactory();
47         this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
48         this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
49         this.clientSelectionKey = this.socket.register(this.selector,
50                         SelectionKey.OP_READ);
51     }
52
53         /**
54          * Sends the OF message out over the socket channel.
55          * 
56          * @param msg OF message to be sent
57          * @throws Exception
58          */
59     @Override
60     public void asyncSend(OFMessage msg) throws IOException {
61         synchronized (outBuffer) {
62                 int msgLen = msg.getLengthU();
63                 if (outBuffer.remaining() < msgLen) {
64                         // increase the buffer size so that it can contain this message
65                         ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
66                                         .capacity()
67                                         + msgLen);
68                         outBuffer.flip();
69                         newBuffer.put(outBuffer);
70                         outBuffer = newBuffer;
71                 }
72                 msg.writeTo(outBuffer);
73
74                 if (!socket.isOpen()) {
75                         return;
76                 }
77
78                 outBuffer.flip();
79                 socket.write(outBuffer);
80                 outBuffer.compact();
81                 if (outBuffer.position() > 0) {
82                         this.clientSelectionKey = this.socket.register(
83                                         this.selector, SelectionKey.OP_WRITE, this);
84                 }
85                 logger.trace("Message sent: {}", msg.toString());
86         }
87     }
88
89         /**
90          * Resumes sending the remaining messages in the outgoing buffer
91          * @throws Exception
92          */
93     @Override
94     public void resumeSend() throws IOException {
95                 synchronized (outBuffer) {
96                         if (!socket.isOpen()) {
97                                 return;
98                         }
99
100                 outBuffer.flip();
101                 socket.write(outBuffer);
102                 outBuffer.compact();
103                 if (outBuffer.position() > 0) {
104                         this.clientSelectionKey = this.socket.register(
105                                         this.selector, SelectionKey.OP_WRITE, this);
106                 } else {
107                         this.clientSelectionKey = this.socket.register(
108                                         this.selector, SelectionKey.OP_READ, this);
109                 }
110         }
111     }
112
113         /**
114          * Reads the incoming network data from the socket and retrieves the OF
115          * messages.
116          * 
117          * @return list of OF messages
118          * @throws Exception
119          */
120     @Override
121     public List<OFMessage> readMessages() throws IOException {
122                 if (!socket.isOpen()) {
123                         return null;
124                 }
125
126                 List<OFMessage> msgs = null;
127         int bytesRead = -1;        
128         bytesRead = socket.read(inBuffer);
129         if (bytesRead < 0) {
130                         throw new AsynchronousCloseException();
131         }
132
133         inBuffer.flip();
134         msgs = factory.parseMessages(inBuffer);
135         if (inBuffer.hasRemaining()) {
136             inBuffer.compact();
137         } else {
138             inBuffer.clear();
139         }
140         return msgs;
141     }
142 }