1 package org.opendaylight.ovsdb.plugin;
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.bootstrap.ServerBootstrap;
5 import io.netty.channel.*;
6 import io.netty.channel.nio.NioEventLoopGroup;
7 import io.netty.channel.socket.SocketChannel;
8 import io.netty.channel.socket.nio.NioServerSocketChannel;
9 import io.netty.channel.socket.nio.NioSocketChannel;
10 import io.netty.handler.codec.string.StringEncoder;
11 import io.netty.handler.logging.LogLevel;
12 import io.netty.handler.logging.LoggingHandler;
13 import io.netty.util.CharsetUtil;
15 import org.opendaylight.controller.sal.connection.ConnectionConstants;
16 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
17 import org.opendaylight.controller.sal.core.Node;
18 import org.opendaylight.controller.sal.utils.Status;
19 import org.opendaylight.controller.sal.utils.StatusCode;
20 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
21 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
22 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
23 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
24 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
25 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
26 import org.opendaylight.ovsdb.lib.message.TableUpdates;
27 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
28 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
29 import org.opendaylight.ovsdb.lib.table.internal.Table;
30 import org.opendaylight.ovsdb.lib.table.internal.Tables;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 import com.fasterxml.jackson.annotation.JsonInclude.Include;
35 import com.fasterxml.jackson.databind.DeserializationFeature;
36 import com.fasterxml.jackson.databind.ObjectMapper;
37 import com.google.common.util.concurrent.ListenableFuture;
39 import java.net.InetAddress;
40 import java.net.InetSocketAddress;
41 import java.util.Arrays;
42 import java.util.List;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.ConcurrentMap;
46 import java.util.concurrent.ExecutionException;
50 * Represents the openflow plugin component in charge of programming the flows
51 * the flow programming and relay them to functional modules above SAL.
53 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
54 protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
56 private static final Integer defaultOvsdbPort = 6640;
57 private static Integer ovsdbListenPort = defaultOvsdbPort;
58 private ConcurrentMap<String, Connection> ovsdbConnections;
59 private List<ChannelHandler> handlers = null;
60 private InventoryServiceInternal inventoryServiceInternal;
61 private Channel serverListenChannel = null;
63 public InventoryServiceInternal getInventoryServiceInternal() {
64 return inventoryServiceInternal;
67 public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
68 this.inventoryServiceInternal = inventoryServiceInternal;
71 public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
72 if (this.inventoryServiceInternal == inventoryServiceInternal) {
73 this.inventoryServiceInternal = null;
78 ovsdbConnections = new ConcurrentHashMap<String, Connection>();
79 int listenPort = defaultOvsdbPort;
80 String portString = System.getProperty("ovsdb.listenPort");
81 if (portString != null) {
82 listenPort = Integer.decode(portString).intValue();
84 ovsdbListenPort = listenPort;
88 * Function called by the dependency manager when at least one dependency
89 * become unsatisfied or when the component is shutting down because for
90 * example bundle is being stopped.
96 * Function called by dependency manager after "init ()" is called and after
97 * the services provided by the class are registered in the service registry
104 * Function called by the dependency manager before the services exported by
105 * the component are unregistered, this will be followed by a "destroy ()"
109 for (Connection connection : ovsdbConnections.values()) {
110 connection.disconnect();
112 serverListenChannel.disconnect();
116 public Status disconnect(Node node) {
117 String identifier = (String) node.getID();
118 Connection connection = ovsdbConnections.get(identifier);
119 if (connection != null) {
120 ovsdbConnections.remove(identifier);
121 return connection.disconnect();
123 return new Status(StatusCode.NOTFOUND);
128 public Node connect(String identifier, Map<ConnectionConstants, String> params) {
133 address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
134 } catch (Exception e) {
140 port = Integer.parseInt(params.get(ConnectionConstants.PORT));
141 if (port == 0) port = defaultOvsdbPort;
142 } catch (Exception e) {
143 port = defaultOvsdbPort;
147 Bootstrap bootstrap = new Bootstrap();
148 bootstrap.group(new NioEventLoopGroup());
149 bootstrap.channel(NioSocketChannel.class);
150 bootstrap.option(ChannelOption.TCP_NODELAY, true);
151 bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
153 bootstrap.handler(new ChannelInitializer<SocketChannel>() {
155 public void initChannel(SocketChannel channel) throws Exception {
156 if (handlers == null) {
157 channel.pipeline().addLast(
158 //new LoggingHandler(LogLevel.INFO),
159 new JsonRpcDecoder(100000),
160 new StringEncoder(CharsetUtil.UTF_8));
162 for (ChannelHandler handler : handlers) {
163 channel.pipeline().addLast(handler);
169 ChannelFuture future = bootstrap.connect(address, port).sync();
170 Channel channel = future.channel();
171 return handleNewConnection(identifier, channel, this);
172 } catch (Exception e) {
178 public List<ChannelHandler> getHandlers() {
182 public void setHandlers(List<ChannelHandler> handlers) {
183 this.handlers = handlers;
187 public Connection getConnection(Node node) {
188 String identifier = (String) node.getID();
189 return ovsdbConnections.get(identifier);
193 public void notifyClusterViewChanged() {
197 public void notifyNodeDisconnectFromMaster(Node arg0) {
200 private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
201 Connection connection = new Connection(identifier, channel);
202 Node node = connection.getNode();
204 ObjectMapper objectMapper = new ObjectMapper();
205 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
206 objectMapper.setSerializationInclusion(Include.NON_NULL);
208 JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
209 JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
210 binderHandler.setNode(node);
211 channel.pipeline().addLast(binderHandler);
213 OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
214 connection.setRpc(ovsdb);
215 ovsdb.registerCallback(instance);
217 // Keeping the Initial inventory update(s) on its own thread.
219 Connection connection;
224 initializeInventoryForNewNode(connection);
225 ovsdbConnections.put(identifier, connection);
226 } catch (InterruptedException e) {
228 } catch (ExecutionException e) {
232 public Thread initializeConnectionParams(String identifier, Connection connection) {
233 this.identifier = identifier;
234 this.connection = connection;
237 }.initializeConnectionParams(identifier, connection).start();
241 private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
242 Channel channel = connection.getChannel();
243 InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
244 int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
245 IPAddressProperty addressProp = new IPAddressProperty(address);
246 L4PortProperty l4Port = new L4PortProperty(port);
247 inventoryServiceInternal.addNodeProperty(connection.getNode(), addressProp);
248 inventoryServiceInternal.addNodeProperty(connection.getNode(), l4Port);
250 List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
251 ListenableFuture<DatabaseSchema> dbSchemaF = connection.getRpc().get_schema(dbNames);
252 DatabaseSchema databaseSchema = dbSchemaF.get();
253 inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
255 MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
256 for (Table<?> table : Tables.getTables()) {
257 if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
258 monitorReq.monitor(table);
260 logger.warn("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
264 ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
265 TableUpdates updates = monResponse.get();
266 if (updates.getError() != null) {
267 logger.error("Error configuring monitor, error : {}, details : {}",
269 updates.getDetails());
270 /* FIXME: This should be cause for alarm */
271 throw new RuntimeException("Failed to setup a monitor in OVSDB");
273 UpdateNotification monitor = new UpdateNotification();
274 monitor.setUpdate(updates);
275 this.update(connection.getNode(), monitor);
278 private void startOvsdbManager() {
286 private void ovsdbManager() {
287 EventLoopGroup bossGroup = new NioEventLoopGroup();
288 EventLoopGroup workerGroup = new NioEventLoopGroup();
290 ServerBootstrap b = new ServerBootstrap();
291 b.group(bossGroup, workerGroup)
292 .channel(NioServerSocketChannel.class)
293 .option(ChannelOption.SO_BACKLOG, 100)
294 .handler(new LoggingHandler(LogLevel.INFO))
295 .childHandler(new ChannelInitializer<SocketChannel>() {
297 public void initChannel(SocketChannel channel) throws Exception {
298 logger.debug("New Passive channel created : "+ channel.toString());
299 InetAddress address = channel.remoteAddress().getAddress();
300 int port = channel.remoteAddress().getPort();
301 String identifier = address.getHostAddress()+":"+port;
302 channel.pipeline().addLast(
303 new LoggingHandler(LogLevel.INFO),
304 new JsonRpcDecoder(100000),
305 new StringEncoder(CharsetUtil.UTF_8));
307 Node node = handleNewConnection(identifier, channel, ConnectionService.this);
308 logger.debug("Connected Node : "+node.toString());
311 b.option(ChannelOption.TCP_NODELAY, true);
312 b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
314 ChannelFuture f = b.bind(ovsdbListenPort).sync();
315 serverListenChannel = f.channel();
316 // Wait until the server socket is closed.
317 serverListenChannel.closeFuture().sync();
318 } catch (InterruptedException e) {
321 // Shut down all event loops to terminate all threads.
322 bossGroup.shutdownGracefully();
323 workerGroup.shutdownGracefully();
328 public void update(Node node, UpdateNotification updateNotification) {
329 inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());
333 public void locked(Node node, List<String> ids) {
334 // TODO Auto-generated method stub
338 public void stolen(Node node, List<String> ids) {
339 // TODO Auto-generated method stub