Merge "Controller IP-Address filtered using the "of.address" controller configuration...
[netvirt.git] / ovsdb / src / main / java / org / opendaylight / ovsdb / plugin / ConnectionService.java
1 package org.opendaylight.ovsdb.plugin;
2
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.bootstrap.ServerBootstrap;
5 import io.netty.channel.AdaptiveRecvByteBufAllocator;
6 import io.netty.channel.Channel;
7 import io.netty.channel.ChannelFuture;
8 import io.netty.channel.ChannelHandler;
9 import io.netty.channel.ChannelInitializer;
10 import io.netty.channel.ChannelOption;
11 import io.netty.channel.EventLoopGroup;
12 import io.netty.channel.nio.NioEventLoopGroup;
13 import io.netty.channel.socket.SocketChannel;
14 import io.netty.channel.socket.nio.NioServerSocketChannel;
15 import io.netty.channel.socket.nio.NioSocketChannel;
16 import io.netty.handler.codec.string.StringEncoder;
17 import io.netty.handler.logging.LogLevel;
18 import io.netty.handler.logging.LoggingHandler;
19 import io.netty.util.CharsetUtil;
20
21 import java.net.InetAddress;
22 import java.net.InetSocketAddress;
23 import java.util.Arrays;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.ExecutionException;
31
32 import org.opendaylight.controller.sal.connection.ConnectionConstants;
33 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
34 import org.opendaylight.controller.sal.core.Node;
35 import org.opendaylight.controller.sal.core.Property;
36 import org.opendaylight.controller.sal.utils.Status;
37 import org.opendaylight.controller.sal.utils.StatusCode;
38 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
39 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
40 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
41 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
42 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
43 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
44 import org.opendaylight.ovsdb.lib.message.TableUpdates;
45 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
46 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
47 import org.opendaylight.ovsdb.lib.table.internal.Table;
48 import org.opendaylight.ovsdb.lib.table.internal.Tables;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.fasterxml.jackson.annotation.JsonInclude.Include;
53 import com.fasterxml.jackson.databind.DeserializationFeature;
54 import com.fasterxml.jackson.databind.ObjectMapper;
55 import com.google.common.util.concurrent.ListenableFuture;
56
57
58 /**
59  * Represents the openflow plugin component in charge of programming the flows
60  * the flow programming and relay them to functional modules above SAL.
61  */
62 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
63     protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
64
65     private static final Integer defaultOvsdbPort = 6640;
66     private static Integer ovsdbListenPort = defaultOvsdbPort;
67     private ConcurrentMap<String, Connection> ovsdbConnections;
68     private List<ChannelHandler> handlers = null;
69     private InventoryServiceInternal inventoryServiceInternal;
70     private Channel serverListenChannel = null;
71
72     public InventoryServiceInternal getInventoryServiceInternal() {
73         return inventoryServiceInternal;
74     }
75
76     public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
77         this.inventoryServiceInternal = inventoryServiceInternal;
78     }
79
80     public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
81         if (this.inventoryServiceInternal == inventoryServiceInternal) {
82             this.inventoryServiceInternal = null;
83         }
84     }
85
86     public void init() {
87         ovsdbConnections = new ConcurrentHashMap<String, Connection>();
88         int listenPort = defaultOvsdbPort;
89         String portString = System.getProperty("ovsdb.listenPort");
90         if (portString != null) {
91             listenPort = Integer.decode(portString).intValue();
92         }
93         ovsdbListenPort = listenPort;
94     }
95
96     /**
97      * Function called by the dependency manager when at least one dependency
98      * become unsatisfied or when the component is shutting down because for
99      * example bundle is being stopped.
100      */
101     void destroy() {
102     }
103
104     /**
105      * Function called by dependency manager after "init ()" is called and after
106      * the services provided by the class are registered in the service registry
107      */
108     void start() {
109         startOvsdbManager();
110     }
111
112     /**
113      * Function called by the dependency manager before the services exported by
114      * the component are unregistered, this will be followed by a "destroy ()"
115      * calls
116      */
117     void stopping() {
118         for (Connection connection : ovsdbConnections.values()) {
119             connection.disconnect();
120         }
121         serverListenChannel.disconnect();
122     }
123
124     @Override
125     public Status disconnect(Node node) {
126         String identifier = (String) node.getID();
127         Connection connection = ovsdbConnections.get(identifier);
128         if (connection != null) {
129             ovsdbConnections.remove(identifier);
130             return connection.disconnect();
131         } else {
132             return new Status(StatusCode.NOTFOUND);
133         }
134     }
135
136     @Override
137     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
138         InetAddress address;
139         Integer port;
140
141         try {
142             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
143         } catch (Exception e) {
144             e.printStackTrace();
145             return null;
146         }
147
148         try {
149             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
150             if (port == 0) port = defaultOvsdbPort;
151         } catch (Exception e) {
152             port = defaultOvsdbPort;
153         }
154
155         try {
156             Bootstrap bootstrap = new Bootstrap();
157             bootstrap.group(new NioEventLoopGroup());
158             bootstrap.channel(NioSocketChannel.class);
159             bootstrap.option(ChannelOption.TCP_NODELAY, true);
160             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
161
162             bootstrap.handler(new ChannelInitializer<SocketChannel>() {
163                 @Override
164                 public void initChannel(SocketChannel channel) throws Exception {
165                     if (handlers == null) {
166                         channel.pipeline().addLast(
167                                 //new LoggingHandler(LogLevel.INFO),
168                                 new JsonRpcDecoder(100000),
169                                 new StringEncoder(CharsetUtil.UTF_8));
170                     } else {
171                         for (ChannelHandler handler : handlers) {
172                             channel.pipeline().addLast(handler);
173                         }
174                     }
175                 }
176             });
177
178             ChannelFuture future = bootstrap.connect(address, port).sync();
179             Channel channel = future.channel();
180             return handleNewConnection(identifier, channel, this);
181         } catch (Exception e) {
182             e.printStackTrace();
183         }
184         return null;
185     }
186
187     public List<ChannelHandler> getHandlers() {
188         return handlers;
189     }
190
191     public void setHandlers(List<ChannelHandler> handlers) {
192         this.handlers = handlers;
193     }
194
195     @Override
196     public Connection getConnection(Node node) {
197         String identifier = (String) node.getID();
198         return ovsdbConnections.get(identifier);
199     }
200
201     @Override
202     public void notifyClusterViewChanged() {
203     }
204
205     @Override
206     public void notifyNodeDisconnectFromMaster(Node arg0) {
207     }
208
209     private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
210         Connection connection = new Connection(identifier, channel);
211         Node node = connection.getNode();
212
213         ObjectMapper objectMapper = new ObjectMapper();
214         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
215         objectMapper.setSerializationInclusion(Include.NON_NULL);
216
217         JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
218         JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
219         binderHandler.setNode(node);
220         channel.pipeline().addLast(binderHandler);
221
222         OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
223         connection.setRpc(ovsdb);
224         ovsdb.registerCallback(instance);
225         ovsdbConnections.put(identifier, connection);
226
227         ChannelConnectionHandler handler = new ChannelConnectionHandler();
228         handler.setNode(node);
229         handler.setConnectionService(this);
230         ChannelFuture closeFuture = channel.closeFuture();
231         closeFuture.addListener(handler);
232         // Keeping the Initial inventory update(s) on its own thread.
233         new Thread() {
234             Connection connection;
235             String identifier;
236
237             @Override
238             public void run() {
239                 try {
240                     initializeInventoryForNewNode(connection);
241                 } catch (Exception e) {
242                     e.printStackTrace();
243                     ovsdbConnections.remove(identifier);
244                 }
245             }
246             public Thread initializeConnectionParams(String identifier, Connection connection) {
247                 this.identifier = identifier;
248                 this.connection = connection;
249                 return this;
250             }
251         }.initializeConnectionParams(identifier, connection).start();
252         return node;
253     }
254
255     public void channelClosed(Node node) throws Exception {
256         logger.error("Connection to Node : {} closed", node);
257         inventoryServiceInternal.removeNode(node);
258     }
259
260     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
261         Channel channel = connection.getChannel();
262         InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
263         int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
264         IPAddressProperty addressProp = new IPAddressProperty(address);
265         L4PortProperty l4Port = new L4PortProperty(port);
266         Set<Property> props = new HashSet<Property>();
267         props.add(addressProp);
268         props.add(l4Port);
269         inventoryServiceInternal.addNode(connection.getNode(), props);
270
271         List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
272         ListenableFuture<DatabaseSchema> dbSchemaF = connection.getRpc().get_schema(dbNames);
273         DatabaseSchema databaseSchema = dbSchemaF.get();
274         inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
275
276         MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
277         for (Table<?> table : Tables.getTables()) {
278             if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
279                 monitorReq.monitor(table);
280             } else {
281                 logger.warn("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
282             }
283         }
284
285         ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
286         TableUpdates updates = monResponse.get();
287         if (updates.getError() != null) {
288             logger.error("Error configuring monitor, error : {}, details : {}",
289                     updates.getError(),
290                     updates.getDetails());
291             /* FIXME: This should be cause for alarm */
292             throw new RuntimeException("Failed to setup a monitor in OVSDB");
293         }
294         UpdateNotification monitor = new UpdateNotification();
295         monitor.setUpdate(updates);
296         this.update(connection.getNode(), monitor);
297     }
298
299     private void startOvsdbManager() {
300         new Thread() {
301             @Override
302             public void run() {
303                 ovsdbManager();
304             }
305         }.start();
306     }
307
308     private void ovsdbManager() {
309         EventLoopGroup bossGroup = new NioEventLoopGroup();
310         EventLoopGroup workerGroup = new NioEventLoopGroup();
311         try {
312             ServerBootstrap b = new ServerBootstrap();
313             b.group(bossGroup, workerGroup)
314              .channel(NioServerSocketChannel.class)
315              .option(ChannelOption.SO_BACKLOG, 100)
316              .handler(new LoggingHandler(LogLevel.INFO))
317              .childHandler(new ChannelInitializer<SocketChannel>() {
318                  @Override
319                  public void initChannel(SocketChannel channel) throws Exception {
320                      logger.debug("New Passive channel created : "+ channel.toString());
321                      InetAddress address = channel.remoteAddress().getAddress();
322                      int port = channel.remoteAddress().getPort();
323                      String identifier = address.getHostAddress()+":"+port;
324                      channel.pipeline().addLast(
325                              new LoggingHandler(LogLevel.INFO),
326                              new JsonRpcDecoder(100000),
327                              new StringEncoder(CharsetUtil.UTF_8));
328
329                      Node node = handleNewConnection(identifier, channel, ConnectionService.this);
330                      logger.debug("Connected Node : "+node.toString());
331                  }
332              });
333             b.option(ChannelOption.TCP_NODELAY, true);
334             b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
335             // Start the server.
336             ChannelFuture f = b.bind(ovsdbListenPort).sync();
337             serverListenChannel =  f.channel();
338             // Wait until the server socket is closed.
339             serverListenChannel.closeFuture().sync();
340         } catch (InterruptedException e) {
341             e.printStackTrace();
342         } finally {
343             // Shut down all event loops to terminate all threads.
344             bossGroup.shutdownGracefully();
345             workerGroup.shutdownGracefully();
346         }
347     }
348
349     @Override
350     public void update(Node node, UpdateNotification updateNotification) {
351         if (updateNotification == null) return;
352         inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());
353     }
354
355     @Override
356     public void locked(Node node, List<String> ids) {
357         // TODO Auto-generated method stub
358     }
359
360     @Override
361     public void stolen(Node node, List<String> ids) {
362         // TODO Auto-generated method stub
363     }
364
365 }