OVSDB Manager Passive listen support
[netvirt.git] / ovsdb / src / main / java / org / opendaylight / ovsdb / plugin / ConnectionService.java
1 package org.opendaylight.ovsdb.plugin;
2
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.bootstrap.ServerBootstrap;
5 import io.netty.channel.*;
6 import io.netty.channel.nio.NioEventLoopGroup;
7 import io.netty.channel.socket.SocketChannel;
8 import io.netty.channel.socket.nio.NioServerSocketChannel;
9 import io.netty.channel.socket.nio.NioSocketChannel;
10 import io.netty.handler.codec.string.StringEncoder;
11 import io.netty.handler.logging.LogLevel;
12 import io.netty.handler.logging.LoggingHandler;
13 import io.netty.util.CharsetUtil;
14
15 import org.opendaylight.controller.sal.connection.ConnectionConstants;
16 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
17 import org.opendaylight.controller.sal.core.Node;
18 import org.opendaylight.controller.sal.utils.Status;
19 import org.opendaylight.controller.sal.utils.StatusCode;
20 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
21 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
22 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
23 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
24 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
25 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
26 import org.opendaylight.ovsdb.lib.message.TableUpdates;
27 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
28 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
29 import org.opendaylight.ovsdb.lib.table.internal.Table;
30 import org.opendaylight.ovsdb.lib.table.internal.Tables;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import com.fasterxml.jackson.annotation.JsonInclude.Include;
35 import com.fasterxml.jackson.databind.DeserializationFeature;
36 import com.fasterxml.jackson.databind.ObjectMapper;
37 import com.google.common.util.concurrent.ListenableFuture;
38
39 import java.net.InetAddress;
40 import java.net.InetSocketAddress;
41 import java.util.Arrays;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.ConcurrentMap;
46 import java.util.concurrent.ExecutionException;
47
48
49 /**
50  * Represents the openflow plugin component in charge of programming the flows
51  * the flow programming and relay them to functional modules above SAL.
52  */
53 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
54     protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
55
56     private static final Integer defaultOvsdbPort = 6640;
57     private static Integer ovsdbListenPort = defaultOvsdbPort;
58     private ConcurrentMap<String, Connection> ovsdbConnections;
59     private List<ChannelHandler> handlers = null;
60     private InventoryServiceInternal inventoryServiceInternal;
61     private Channel serverListenChannel = null;
62
63     public InventoryServiceInternal getInventoryServiceInternal() {
64         return inventoryServiceInternal;
65     }
66
67     public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
68         this.inventoryServiceInternal = inventoryServiceInternal;
69     }
70
71     public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
72         if (this.inventoryServiceInternal == inventoryServiceInternal) {
73             this.inventoryServiceInternal = null;
74         }
75     }
76
77     public void init() {
78         ovsdbConnections = new ConcurrentHashMap<String, Connection>();
79         int listenPort = defaultOvsdbPort;
80         String portString = System.getProperty("ovsdb.listenPort");
81         if (portString != null) {
82             listenPort = Integer.decode(portString).intValue();
83         }
84         ovsdbListenPort = listenPort;
85     }
86
87     /**
88      * Function called by the dependency manager when at least one dependency
89      * become unsatisfied or when the component is shutting down because for
90      * example bundle is being stopped.
91      */
92     void destroy() {
93     }
94
95     /**
96      * Function called by dependency manager after "init ()" is called and after
97      * the services provided by the class are registered in the service registry
98      */
99     void start() {
100         startOvsdbManager();
101     }
102
103     /**
104      * Function called by the dependency manager before the services exported by
105      * the component are unregistered, this will be followed by a "destroy ()"
106      * calls
107      */
108     void stop() {
109         for (Connection connection : ovsdbConnections.values()) {
110             connection.disconnect();
111         }
112         serverListenChannel.disconnect();
113     }
114
115     @Override
116     public Status disconnect(Node node) {
117         String identifier = (String) node.getID();
118         Connection connection = ovsdbConnections.get(identifier);
119         if (connection != null) {
120             ovsdbConnections.remove(identifier);
121             return connection.disconnect();
122         } else {
123             return new Status(StatusCode.NOTFOUND);
124         }
125     }
126
127     @Override
128     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
129         InetAddress address;
130         Integer port;
131
132         try {
133             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
134         } catch (Exception e) {
135             e.printStackTrace();
136             return null;
137         }
138
139         try {
140             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
141             if (port == 0) port = defaultOvsdbPort;
142         } catch (Exception e) {
143             port = defaultOvsdbPort;
144         }
145
146         try {
147             Bootstrap bootstrap = new Bootstrap();
148             bootstrap.group(new NioEventLoopGroup());
149             bootstrap.channel(NioSocketChannel.class);
150             bootstrap.option(ChannelOption.TCP_NODELAY, true);
151             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
152
153             bootstrap.handler(new ChannelInitializer<SocketChannel>() {
154                 @Override
155                 public void initChannel(SocketChannel channel) throws Exception {
156                     if (handlers == null) {
157                         channel.pipeline().addLast(
158                                 //new LoggingHandler(LogLevel.INFO),
159                                 new JsonRpcDecoder(100000),
160                                 new StringEncoder(CharsetUtil.UTF_8));
161                     } else {
162                         for (ChannelHandler handler : handlers) {
163                             channel.pipeline().addLast(handler);
164                         }
165                     }
166                 }
167             });
168
169             ChannelFuture future = bootstrap.connect(address, port).sync();
170             Channel channel = future.channel();
171             return handleNewConnection(identifier, channel, this);
172         } catch (Exception e) {
173             e.printStackTrace();
174         }
175         return null;
176     }
177
178     public List<ChannelHandler> getHandlers() {
179         return handlers;
180     }
181
182     public void setHandlers(List<ChannelHandler> handlers) {
183         this.handlers = handlers;
184     }
185
186     @Override
187     public Connection getConnection(Node node) {
188         String identifier = (String) node.getID();
189         return ovsdbConnections.get(identifier);
190     }
191
192     @Override
193     public void notifyClusterViewChanged() {
194     }
195
196     @Override
197     public void notifyNodeDisconnectFromMaster(Node arg0) {
198     }
199
200     private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
201         Connection connection = new Connection(identifier, channel);
202         Node node = connection.getNode();
203
204         ObjectMapper objectMapper = new ObjectMapper();
205         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
206         objectMapper.setSerializationInclusion(Include.NON_NULL);
207
208         JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
209         JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
210         binderHandler.setNode(node);
211         channel.pipeline().addLast(binderHandler);
212
213         OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
214         connection.setRpc(ovsdb);
215         ovsdb.registerCallback(instance);
216
217         // Keeping the Initial inventory update(s) on its own thread.
218         new Thread() {
219             Connection connection;
220             String identifier;
221
222             public void run() {
223                 try {
224                     initializeInventoryForNewNode(connection);
225                     ovsdbConnections.put(identifier, connection);
226                 } catch (InterruptedException e) {
227                     e.printStackTrace();
228                 } catch (ExecutionException e) {
229                     e.printStackTrace();
230                 }
231             }
232             public Thread initializeConnectionParams(String identifier, Connection connection) {
233                 this.identifier = identifier;
234                 this.connection = connection;
235                 return this;
236             }
237         }.initializeConnectionParams(identifier, connection).start();
238         return node;
239     }
240
241     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
242         Channel channel = connection.getChannel();
243         InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
244         int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
245         IPAddressProperty addressProp = new IPAddressProperty(address);
246         L4PortProperty l4Port = new L4PortProperty(port);
247         inventoryServiceInternal.addNodeProperty(connection.getNode(), addressProp);
248         inventoryServiceInternal.addNodeProperty(connection.getNode(), l4Port);
249
250         List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
251         ListenableFuture<DatabaseSchema> dbSchemaF = connection.getRpc().get_schema(dbNames);
252         DatabaseSchema databaseSchema = dbSchemaF.get();
253         inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
254
255         MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
256         for (Table<?> table : Tables.getTables()) {
257             if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
258                 monitorReq.monitor(table);
259             } else {
260                 logger.warn("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
261             }
262         }
263
264         ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
265         TableUpdates updates = monResponse.get();
266         if (updates.getError() != null) {
267             logger.error("Error configuring monitor, error : {}, details : {}",
268                     updates.getError(),
269                     updates.getDetails());
270             /* FIXME: This should be cause for alarm */
271             throw new RuntimeException("Failed to setup a monitor in OVSDB");
272         }
273         UpdateNotification monitor = new UpdateNotification();
274         monitor.setUpdate(updates);
275         this.update(connection.getNode(), monitor);
276     }
277
278     private void startOvsdbManager() {
279         new Thread() {
280             public void run() {
281                 ovsdbManager();
282             }
283         }.start();
284     }
285
286     private void ovsdbManager() {
287         EventLoopGroup bossGroup = new NioEventLoopGroup();
288         EventLoopGroup workerGroup = new NioEventLoopGroup();
289         try {
290             ServerBootstrap b = new ServerBootstrap();
291             b.group(bossGroup, workerGroup)
292              .channel(NioServerSocketChannel.class)
293              .option(ChannelOption.SO_BACKLOG, 100)
294              .handler(new LoggingHandler(LogLevel.INFO))
295              .childHandler(new ChannelInitializer<SocketChannel>() {
296                  @Override
297                  public void initChannel(SocketChannel channel) throws Exception {
298                      logger.debug("New Passive channel created : "+ channel.toString());
299                      InetAddress address = channel.remoteAddress().getAddress();
300                      int port = channel.remoteAddress().getPort();
301                      String identifier = address.getHostAddress()+":"+port;
302                      channel.pipeline().addLast(
303                              new LoggingHandler(LogLevel.INFO),
304                              new JsonRpcDecoder(100000),
305                              new StringEncoder(CharsetUtil.UTF_8));
306
307                      Node node = handleNewConnection(identifier, channel, ConnectionService.this);
308                      logger.debug("Connected Node : "+node.toString());
309                  }
310              });
311             b.option(ChannelOption.TCP_NODELAY, true);
312             b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
313             // Start the server.
314             ChannelFuture f = b.bind(ovsdbListenPort).sync();
315             serverListenChannel =  f.channel();
316             // Wait until the server socket is closed.
317             serverListenChannel.closeFuture().sync();
318         } catch (InterruptedException e) {
319             e.printStackTrace();
320         } finally {
321             // Shut down all event loops to terminate all threads.
322             bossGroup.shutdownGracefully();
323             workerGroup.shutdownGracefully();
324         }
325     }
326
327     @Override
328     public void update(Node node, UpdateNotification updateNotification) {
329         inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());
330     }
331
332     @Override
333     public void locked(Node node, List<String> ids) {
334         // TODO Auto-generated method stub
335     }
336
337     @Override
338     public void stolen(Node node, List<String> ids) {
339         // TODO Auto-generated method stub
340     }
341
342 }