Merge "BUG-2218: Keep existing link augmentations during discovery process"
[controller.git] / opendaylight / adsal / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / MessageReadWriteService.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.nio.ByteBuffer;
12 import java.nio.channels.AsynchronousCloseException;
13 import java.nio.channels.ClosedChannelException;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.util.List;
18
19 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
20 import org.openflow.protocol.OFMessage;
21 import org.openflow.protocol.factory.BasicFactory;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * This class implements methods to read/write messages over an established
27  * socket channel. The data exchange is in clear text format.
28  */
29 public class MessageReadWriteService implements IMessageReadWrite {
30     private static final Logger logger = LoggerFactory
31             .getLogger(MessageReadWriteService.class);
32     private static final int bufferSize = 1024 * 1024;
33
34     private Selector selector;
35     private SelectionKey clientSelectionKey;
36     private SocketChannel socket;
37     private ByteBuffer inBuffer;
38     private ByteBuffer outBuffer;
39     private BasicFactory factory;
40
41     public MessageReadWriteService(SocketChannel socket, Selector selector)
42             throws ClosedChannelException {
43         this.socket = socket;
44         this.selector = selector;
45         this.factory = new BasicFactory();
46         this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
47         this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
48         this.clientSelectionKey = this.socket.register(this.selector,
49                 SelectionKey.OP_READ);
50     }
51
52     /**
53      * Sends the OF message out over the socket channel.
54      *
55      * @param msg
56      *            OF message to be sent
57      * @throws Exception
58      */
59     @Override
60     public void asyncSend(OFMessage msg) throws Exception {
61         synchronized (outBuffer) {
62             int msgLen = msg.getLengthU();
63             if (outBuffer.remaining() < msgLen) {
64                 // increase the buffer size so that it can contain this message
65                 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
66                         .capacity() + msgLen);
67                 outBuffer.flip();
68                 newBuffer.put(outBuffer);
69                 outBuffer = newBuffer;
70             }
71         }
72         synchronized (outBuffer) {
73             msg.writeTo(outBuffer);
74
75             if (!socket.isOpen()) {
76                 return;
77             }
78
79             outBuffer.flip();
80             socket.write(outBuffer);
81             outBuffer.compact();
82             if (outBuffer.position() > 0) {
83                 this.clientSelectionKey = this.socket.register(this.selector,
84                         SelectionKey.OP_WRITE, this);
85             }
86             logger.trace("Message sent: {}", msg);
87         }
88     }
89
90     /**
91      * Resumes sending the remaining messages in the outgoing buffer
92      *
93      * @throws Exception
94      */
95     @Override
96     public void resumeSend() throws Exception {
97         synchronized (outBuffer) {
98             if (!socket.isOpen()) {
99                 return;
100             }
101
102             outBuffer.flip();
103             socket.write(outBuffer);
104             outBuffer.compact();
105             if (outBuffer.position() > 0) {
106                 this.clientSelectionKey = this.socket.register(this.selector,
107                         SelectionKey.OP_WRITE, this);
108             } else {
109                 this.clientSelectionKey = this.socket.register(this.selector,
110                         SelectionKey.OP_READ, this);
111             }
112         }
113     }
114
115     /**
116      * Reads the incoming network data from the socket and retrieves the OF
117      * messages.
118      *
119      * @return list of OF messages
120      * @throws Exception
121      */
122     @Override
123     public List<OFMessage> readMessages() throws Exception {
124         if (!socket.isOpen()) {
125             return null;
126         }
127
128         List<OFMessage> msgs = null;
129         int bytesRead = -1;
130         bytesRead = socket.read(inBuffer);
131         if (bytesRead < 0) {
132             throw new AsynchronousCloseException();
133         }
134
135         try {
136             inBuffer.flip();
137             msgs = factory.parseMessages(inBuffer);
138             if (inBuffer.hasRemaining()) {
139                 inBuffer.compact();
140             } else {
141                 inBuffer.clear();
142             }
143         } catch (Exception e) {
144             inBuffer.clear();
145             logger.debug("Caught exception: ", e);
146         }
147         return msgs;
148     }
149
150     @Override
151     public void stop() {
152         inBuffer = null;
153         outBuffer = null;
154     }
155 }