Refactor of the OVSDB Plugin
[netvirt.git] / plugin / src / main / java / org / opendaylight / ovsdb / plugin / impl / ConnectionServiceImpl.java
1 /*
2  * Copyright (C) 2013 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
9  */
10 package org.opendaylight.ovsdb.plugin.impl;
11
12 import io.netty.channel.ChannelHandler;
13
14 import java.io.IOException;
15 import java.net.InetAddress;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25
26 import org.opendaylight.controller.sal.connection.ConnectionConstants;
27 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
28 import org.opendaylight.controller.sal.core.Node;
29 import org.opendaylight.controller.sal.core.Property;
30 import org.opendaylight.controller.sal.utils.Status;
31 import org.opendaylight.controller.sal.utils.StatusCode;
32 import org.opendaylight.ovsdb.lib.MonitorCallBack;
33 import org.opendaylight.ovsdb.lib.OvsdbClient;
34 import org.opendaylight.ovsdb.lib.OvsdbConnection;
35 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
36 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
37 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
38 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
39 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
40 import org.opendaylight.ovsdb.lib.message.TableUpdates;
41 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
42 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
43 import org.opendaylight.ovsdb.lib.schema.TableSchema;
44 import org.opendaylight.ovsdb.plugin.IConnectionServiceInternal;
45 import org.opendaylight.ovsdb.plugin.api.Connection;
46 import org.opendaylight.ovsdb.plugin.internal.IPAddressProperty;
47 import org.opendaylight.ovsdb.plugin.internal.L4PortProperty;
48 import org.opendaylight.ovsdb.plugin.api.OvsdbConnectionService;
49 import org.opendaylight.ovsdb.plugin.api.OvsdbInventoryService;
50
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.collect.Lists;
55
56
57 /**
58  * Represents the openflow plugin component in charge of programming the flows
59  * the flow programming and relay them to functional modules above SAL.
60  */
61 public class ConnectionServiceImpl implements IPluginInConnectionService,
62                                               OvsdbConnectionService,
63                                               IConnectionServiceInternal,
64                                               OvsdbConnectionListener {
65     protected static final Logger logger = LoggerFactory.getLogger(ConnectionServiceImpl.class);
66
67     // Properties that can be set in config.ini
68     private static final Integer defaultOvsdbPort = 6640;
69
70
71     private ConcurrentMap<String, Connection> ovsdbConnections = new ConcurrentHashMap<String, Connection>();
72     private List<ChannelHandler> handlers = null;
73
74     private volatile OvsdbInventoryService ovsdbInventoryService;
75     private volatile OvsdbConnection connectionLib;
76
77     public void setOvsdbInventoryService(OvsdbInventoryService inventoryService) {
78         this.ovsdbInventoryService = inventoryService;
79     }
80
81     public void setOvsdbConnection(OvsdbConnection ovsdbConnection) {
82         this.connectionLib = ovsdbConnection;
83     }
84
85     public void init() {
86     }
87
88     /**
89      * Function called by the dependency manager when at least one dependency
90      * become unsatisfied or when the component is shutting down because for
91      * example bundle is being stopped.
92      */
93     void destroy() {
94     }
95
96     /**
97      * Function called by dependency manager after "init ()" is called and after
98      * the services provided by the class are registered in the service registry
99      */
100     void start() {
101         Collection<OvsdbClient> connections = connectionLib.getConnections();
102         for (OvsdbClient client : connections) {
103             this.connected(client);
104         }
105     }
106
107     /**
108      * Function called by the dependency manager before the services exported by
109      * the component are unregistered, this will be followed by a "destroy ()"
110      * calls
111      */
112     void stopping() {
113         for (Connection connection : ovsdbConnections.values()) {
114             connection.disconnect();
115         }
116     }
117
118     @Override
119     public Status disconnect(Node node) {
120         String identifier = (String) node.getID();
121         Connection connection = ovsdbConnections.get(identifier);
122         if (connection != null) {
123             ovsdbConnections.remove(identifier);
124             connection.disconnect();
125             ovsdbInventoryService.removeNode(node);
126             return new Status(StatusCode.SUCCESS);
127         } else {
128             return new Status(StatusCode.NOTFOUND);
129         }
130     }
131
132     @Override
133     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
134         InetAddress address;
135         Integer port;
136
137         try {
138             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
139         } catch (Exception e) {
140             logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
141             return null;
142         }
143
144         try {
145             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
146             if (port == 0) port = defaultOvsdbPort;
147         } catch (Exception e) {
148             port = defaultOvsdbPort;
149         }
150
151         try {
152             OvsdbClient client = connectionLib.connect(address, port);
153             return handleNewConnection(identifier, client);
154         } catch (InterruptedException e) {
155             logger.error("Thread was interrupted during connect", e);
156         } catch (ExecutionException e) {
157             logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
158         }
159         return null;
160     }
161
162     public List<ChannelHandler> getHandlers() {
163         return handlers;
164     }
165
166     public void setHandlers(List<ChannelHandler> handlers) {
167         this.handlers = handlers;
168     }
169
170     @Override
171     public Connection getConnection(Node node) {
172         String identifier = (String) node.getID();
173         return ovsdbConnections.get(identifier);
174     }
175
176     @Override
177     public List<Node> getNodes() {
178         List<Node> nodes = new ArrayList<Node>();
179         for (Connection connection : ovsdbConnections.values()) {
180             nodes.add(connection.getNode());
181         }
182         return nodes;
183     }
184
185     @Override
186     public void notifyClusterViewChanged() {
187     }
188
189     @Override
190     public void notifyNodeDisconnectFromMaster(Node arg0) {
191     }
192
193     private Node handleNewConnection(String identifier, OvsdbClient client) throws InterruptedException, ExecutionException {
194         Connection connection = new Connection(identifier, client);
195         Node node = connection.getNode();
196         ovsdbConnections.put(identifier, connection);
197         List<String> dbs = client.getDatabases().get();
198         for (String db : dbs) {
199             client.getSchema(db).get();
200         }
201         // Keeping the Initial inventory update(s) on its own thread.
202         new Thread() {
203             Connection connection;
204             String identifier;
205
206             @Override
207             public void run() {
208                 try {
209                     logger.info("Initialize inventory for {}", connection.toString());
210                     initializeInventoryForNewNode(connection);
211                 } catch (InterruptedException | ExecutionException | IOException e) {
212                     logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
213                     ovsdbConnections.remove(identifier);
214                 }
215             }
216             public Thread initializeConnectionParams(String identifier, Connection connection) {
217                 this.identifier = identifier;
218                 this.connection = connection;
219                 return this;
220             }
221         }.initializeConnectionParams(identifier, connection).start();
222         return node;
223     }
224
225     public void channelClosed(Node node) throws Exception {
226         logger.info("Connection to Node : {} closed", node);
227         disconnect(node);
228         ovsdbInventoryService.removeNode(node);
229     }
230
231     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException, IOException {
232         OvsdbClient client = connection.getClient();
233         InetAddress address = client.getConnectionInfo().getRemoteAddress();
234         int port = client.getConnectionInfo().getRemotePort();
235         IPAddressProperty addressProp = new IPAddressProperty(address);
236         L4PortProperty l4Port = new L4PortProperty(port);
237         Set<Property> props = new HashSet<Property>();
238         props.add(addressProp);
239         props.add(l4Port);
240         logger.info("Add node to ovsdb inventory service {}", connection.getNode().toString());
241         ovsdbInventoryService.addNode(connection.getNode(), props);
242
243         List<String> databases = client.getDatabases().get();
244         if (databases == null) {
245             logger.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
246             return;
247         }
248         for (String database : databases) {
249             DatabaseSchema dbSchema = client.getSchema(database).get();
250             TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
251             ovsdbInventoryService.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
252         }
253         logger.info("Notifying Inventory Listeners for Node Added: {}", connection.getNode().toString());
254         ovsdbInventoryService.notifyNodeAdded(connection.getNode());
255     }
256
257     public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
258         String identifier = (String) node.getID();
259         Connection connection = ovsdbConnections.get(identifier);
260         OvsdbClient client = connection.getClient();
261         if (dbSchema == null) {
262             logger.error("Unable to get Database Schema for the ovsdb connection : {}", client.getConnectionInfo());
263             return null;
264         }
265         Set<String> tables = dbSchema.getTables();
266         if (tables == null) {
267             logger.warn("Database {} without any tables. Strange !", dbSchema.getName());
268             return null;
269         }
270         List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
271         for (String tableName : tables) {
272             GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
273             monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
274         }
275         return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
276     }
277
278     /**
279      * As per RFC 7047, section 4.1.5, if a Monitor request is sent without any columns, the update response will not include
280      * the _uuid column.
281      * ----------------------------------------------------------------------------------------------------------------------------------
282      * Each <monitor-request> specifies one or more columns and the manner in which the columns (or the entire table) are to be monitored.
283      * The "columns" member specifies the columns whose values are monitored. It MUST NOT contain duplicates.
284      * If "columns" is omitted, all columns in the table, except for "_uuid", are monitored.
285      * ----------------------------------------------------------------------------------------------------------------------------------
286      * In order to overcome this limitation, this method
287      *
288      * @return MonitorRequest that includes all the Bridge Columns including _uuid
289      */
290     public <T extends TableSchema<T>> MonitorRequest<T> getAllColumnsMonitorRequest (T tableSchema) {
291         Set<String> columns = tableSchema.getColumns();
292         MonitorRequestBuilder<T> monitorBuilder = MonitorRequestBuilder.builder(tableSchema);
293         for (String column : columns) {
294             monitorBuilder.addColumn(column);
295         }
296         return monitorBuilder.with(new MonitorSelect(true, true, true, true)).build();
297     }
298
299     private class UpdateMonitor implements MonitorCallBack {
300         Node node = null;
301         public UpdateMonitor(Node node) {
302             this.node = node;
303         }
304
305         @Override
306         public void update(TableUpdates result, DatabaseSchema dbSchema) {
307             ovsdbInventoryService.processTableUpdates(node, dbSchema.getName(), result);
308         }
309
310         @Override
311         public void exception(Throwable t) {
312             System.out.println("Exception t = " + t);
313         }
314     }
315
316     private String getConnectionIdentifier(OvsdbClient client) {
317         OvsdbConnectionInfo info = client.getConnectionInfo();
318         return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
319     }
320
321
322     @Override
323     public void connected(OvsdbClient client) {
324         String identifier = getConnectionIdentifier(client);
325         try {
326             this.handleNewConnection(identifier, client);
327         } catch (InterruptedException | ExecutionException e) {
328             e.printStackTrace();
329         }
330     }
331
332     @Override
333     public void disconnected(OvsdbClient client) {
334         Connection connection = ovsdbConnections.get(this.getConnectionIdentifier(client));
335         if (connection == null) return;
336         this.disconnect(connection.getNode());
337     }
338 }