Switch node-id's for ovsdb-managed-nodes
[ovsdb.git] / plugin / src / main / java / org / opendaylight / ovsdb / plugin / impl / ConnectionServiceImpl.java
1 /*
2  * Copyright (C) 2013 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
9  */
10 package org.opendaylight.ovsdb.plugin.impl;
11
12 import io.netty.channel.ChannelHandler;
13
14 import java.io.IOException;
15 import java.net.InetAddress;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25
26 import org.opendaylight.controller.sal.core.Node;
27 import org.opendaylight.controller.sal.core.Property;
28 import org.opendaylight.ovsdb.lib.MonitorCallBack;
29 import org.opendaylight.ovsdb.lib.OvsdbClient;
30 import org.opendaylight.ovsdb.lib.OvsdbConnection;
31 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
32 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
33 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
34 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
35 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
36 import org.opendaylight.ovsdb.lib.message.TableUpdates;
37 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
38 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
39 import org.opendaylight.ovsdb.lib.schema.TableSchema;
40 import org.opendaylight.ovsdb.plugin.api.Connection;
41 import org.opendaylight.ovsdb.plugin.api.ConnectionConstants;
42 import org.opendaylight.ovsdb.plugin.api.Status;
43 import org.opendaylight.ovsdb.plugin.api.StatusCode;
44 import org.opendaylight.ovsdb.plugin.internal.IPAddressProperty;
45 import org.opendaylight.ovsdb.plugin.internal.L4PortProperty;
46 import org.opendaylight.ovsdb.plugin.api.OvsdbConnectionService;
47 import org.opendaylight.ovsdb.plugin.api.OvsdbInventoryService;
48 import org.opendaylight.ovsdb.utils.config.ConfigProperties;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.google.common.collect.Lists;
53
54
55 /**
56  * Represents the openflow plugin component in charge of programming the flows
57  * the flow programming and relay them to functional modules above SAL.
58  */
59 public class ConnectionServiceImpl implements OvsdbConnectionService,
60                                               OvsdbConnectionListener {
61     protected static final Logger logger = LoggerFactory.getLogger(ConnectionServiceImpl.class);
62
63     // Properties that can be set in config.ini
64     private static final Integer DEFAULT_OVSDB_PORT = 6640;
65     private static final String OVSDB_LISTENPORT = "ovsdb.listenPort";
66
67
68     public void putOvsdbConnection (String identifier, Connection connection) {
69         ovsdbConnections.put(identifier, connection);
70     }
71
72     private ConcurrentMap<String, Connection> ovsdbConnections = new ConcurrentHashMap<String, Connection>();
73     private List<ChannelHandler> handlers = null;
74
75     private volatile OvsdbInventoryService ovsdbInventoryService;
76     private volatile OvsdbConnection connectionLib;
77
78     public void setOvsdbInventoryService(OvsdbInventoryService inventoryService) {
79         this.ovsdbInventoryService = inventoryService;
80     }
81
82     public void setOvsdbConnection(OvsdbConnection ovsdbConnection) {
83         this.connectionLib = ovsdbConnection;
84     }
85
86     public void init() {
87     }
88
89     /**
90      * Function called by the dependency manager when at least one dependency
91      * become unsatisfied or when the component is shutting down because for
92      * example bundle is being stopped.
93      */
94     void destroy() {
95     }
96
97     /**
98      * Function called by dependency manager after "init ()" is called and after
99      * the services provided by the class are registered in the service registry
100      */
101     void start() {
102         /* Start ovsdb server before getting connection clients */
103         String portString = ConfigProperties.getProperty(OvsdbConnectionService.class, OVSDB_LISTENPORT);
104         int ovsdbListenPort = DEFAULT_OVSDB_PORT;
105         if (portString != null) {
106             ovsdbListenPort = Integer.decode(portString).intValue();
107         }
108
109         if (!connectionLib.startOvsdbManager(ovsdbListenPort)) {
110             logger.warn("Start OVSDB manager call from ConnectionService was not necessary");
111         }
112
113         /* Then get connection clients */
114         Collection<OvsdbClient> connections = connectionLib.getConnections();
115         for (OvsdbClient client : connections) {
116             logger.info("CONNECT start connected clients client = {}", client);
117             this.connected(client);
118         }
119     }
120
121     /**
122      * Function called by the dependency manager before the services exported by
123      * the component are unregistered, this will be followed by a "destroy ()"
124      * calls
125      */
126     void stopping() {
127         for (Connection connection : ovsdbConnections.values()) {
128             connection.disconnect();
129         }
130     }
131
132     public Status disconnect(Node node) {
133         String identifier = (String) node.getID();
134         Connection connection = ovsdbConnections.get(identifier);
135         if (connection != null) {
136             ovsdbConnections.remove(identifier);
137             connection.disconnect();
138             ovsdbInventoryService.removeNode(node);
139             return new Status(StatusCode.SUCCESS);
140         } else {
141             return new Status(StatusCode.NOTFOUND);
142         }
143     }
144
145     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
146         InetAddress address;
147         Integer port;
148
149         try {
150             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
151         } catch (Exception e) {
152             logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
153             return null;
154         }
155
156         try {
157             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
158             if (port == 0) port = DEFAULT_OVSDB_PORT;
159         } catch (Exception e) {
160             port = DEFAULT_OVSDB_PORT;
161         }
162
163         try {
164             OvsdbClient client = connectionLib.connect(address, port);
165             return handleNewConnection(identifier, client);
166         } catch (InterruptedException e) {
167             logger.error("Thread was interrupted during connect", e);
168         } catch (ExecutionException e) {
169             logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
170         }
171         return null;
172     }
173
174     public List<ChannelHandler> getHandlers() {
175         return handlers;
176     }
177
178     public void setHandlers(List<ChannelHandler> handlers) {
179         this.handlers = handlers;
180     }
181
182     @Override
183     public Connection getConnection(Node node) {
184         String identifier = (String) node.getID();
185         return ovsdbConnections.get(identifier);
186     }
187
188     @Override
189     public Node getNode (String identifier) {
190         String id = identifier;
191
192         String[] pair = identifier.split("\\|");
193         if (pair[0].equals("OVS")) {
194             id = pair[1];
195         }
196
197         Connection connection = ovsdbConnections.get(id);
198         if (connection != null) {
199             return connection.getNode();
200         } else {
201             return null;
202         }
203     }
204
205     @Override
206     public List<Node> getNodes() {
207         List<Node> nodes = new ArrayList<Node>();
208         for (Connection connection : ovsdbConnections.values()) {
209             nodes.add(connection.getNode());
210         }
211         return nodes;
212     }
213
214     private Node handleNewConnection(String identifier, OvsdbClient client) throws InterruptedException, ExecutionException {
215         Connection connection = new Connection(identifier, client);
216         Node node = connection.getNode();
217         ovsdbConnections.put(identifier, connection);
218         List<String> dbs = client.getDatabases().get();
219         for (String db : dbs) {
220             client.getSchema(db).get();
221         }
222         // Keeping the Initial inventory update(s) on its own thread.
223         new Thread() {
224             Connection connection;
225             String identifier;
226
227             @Override
228             public void run() {
229                 try {
230                     logger.info("Initialize inventory for {}", connection.toString());
231                     initializeInventoryForNewNode(connection);
232                 } catch (InterruptedException | ExecutionException | IOException e) {
233                     logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
234                     ovsdbConnections.remove(identifier);
235                 }
236             }
237             public Thread initializeConnectionParams(String identifier, Connection connection) {
238                 this.identifier = identifier;
239                 this.connection = connection;
240                 return this;
241             }
242         }.initializeConnectionParams(identifier, connection).start();
243         return node;
244     }
245
246     public void channelClosed(Node node) throws Exception {
247         logger.info("Connection to Node : {} closed", node);
248         disconnect(node);
249         ovsdbInventoryService.removeNode(node);
250     }
251
252     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException, IOException {
253         OvsdbClient client = connection.getClient();
254         InetAddress address = client.getConnectionInfo().getRemoteAddress();
255         int port = client.getConnectionInfo().getRemotePort();
256         IPAddressProperty addressProp = new IPAddressProperty(address);
257         L4PortProperty l4Port = new L4PortProperty(port);
258         Set<Property> props = new HashSet<Property>();
259         props.add(addressProp);
260         props.add(l4Port);
261         logger.info("Add node to ovsdb inventory service {}", connection.getNode().toString());
262         ovsdbInventoryService.addNode(connection.getNode(), props);
263
264         List<String> databases = client.getDatabases().get();
265         if (databases == null) {
266             logger.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
267             return;
268         }
269         for (String database : databases) {
270             DatabaseSchema dbSchema = client.getSchema(database).get();
271             TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
272             ovsdbInventoryService.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
273         }
274         logger.info("Notifying Inventory Listeners for Node Added: {}", connection.getNode().toString());
275         ovsdbInventoryService.notifyNodeAdded(connection.getNode(), address, port);
276     }
277
278     public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
279         String identifier = (String) node.getID();
280         Connection connection = ovsdbConnections.get(identifier);
281         OvsdbClient client = connection.getClient();
282         if (dbSchema == null) {
283             logger.error("Unable to get Database Schema for the ovsdb connection : {}", client.getConnectionInfo());
284             return null;
285         }
286         Set<String> tables = dbSchema.getTables();
287         if (tables == null) {
288             logger.warn("Database {} without any tables. Strange !", dbSchema.getName());
289             return null;
290         }
291         List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
292         for (String tableName : tables) {
293             GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
294             monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
295         }
296         return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
297     }
298
299     /**
300      * As per RFC 7047, section 4.1.5, if a Monitor request is sent without any columns, the update response will not include
301      * the _uuid column.
302      * ----------------------------------------------------------------------------------------------------------------------------------
303      * Each <monitor-request> specifies one or more columns and the manner in which the columns (or the entire table) are to be monitored.
304      * The "columns" member specifies the columns whose values are monitored. It MUST NOT contain duplicates.
305      * If "columns" is omitted, all columns in the table, except for "_uuid", are monitored.
306      * ----------------------------------------------------------------------------------------------------------------------------------
307      * In order to overcome this limitation, this method
308      *
309      * @return MonitorRequest that includes all the Bridge Columns including _uuid
310      */
311     public <T extends TableSchema<T>> MonitorRequest<T> getAllColumnsMonitorRequest (T tableSchema) {
312         Set<String> columns = tableSchema.getColumns();
313         MonitorRequestBuilder<T> monitorBuilder = MonitorRequestBuilder.builder(tableSchema);
314         for (String column : columns) {
315             monitorBuilder.addColumn(column);
316         }
317         return monitorBuilder.with(new MonitorSelect(true, true, true, true)).build();
318     }
319
320     private class UpdateMonitor implements MonitorCallBack {
321         Node node = null;
322         public UpdateMonitor(Node node) {
323             this.node = node;
324         }
325
326         @Override
327         public void update(TableUpdates result, DatabaseSchema dbSchema) {
328             ovsdbInventoryService.processTableUpdates(node, dbSchema.getName(), result);
329         }
330
331         @Override
332         public void exception(Throwable t) {
333             System.out.println("Exception t = " + t);
334         }
335     }
336
337     private String getConnectionIdentifier(OvsdbClient client) {
338         OvsdbConnectionInfo info = client.getConnectionInfo();
339         return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
340     }
341
342
343     @Override
344     public void connected(OvsdbClient client) {
345         String identifier = getConnectionIdentifier(client);
346         try {
347             this.handleNewConnection(identifier, client);
348         } catch (InterruptedException | ExecutionException e) {
349             e.printStackTrace();
350         }
351     }
352
353     @Override
354     public void disconnected(OvsdbClient client) {
355         Connection connection = ovsdbConnections.get(this.getConnectionIdentifier(client));
356         if (connection == null) return;
357         this.disconnect(connection.getNode());
358     }
359 }