Merge "Added LBaaS/OS API calls to the project Postma collection"
[ovsdb.git] / plugin / src / main / java / org / opendaylight / ovsdb / plugin / ConnectionService.java
1 /*
2  * Copyright (C) 2013 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
9  */
10 package org.opendaylight.ovsdb.plugin;
11
12 import io.netty.channel.ChannelHandler;
13
14 import java.io.IOException;
15 import java.net.InetAddress;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25
26 import org.opendaylight.controller.sal.connection.ConnectionConstants;
27 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
28 import org.opendaylight.controller.sal.core.Node;
29 import org.opendaylight.controller.sal.core.Property;
30 import org.opendaylight.controller.sal.utils.Status;
31 import org.opendaylight.controller.sal.utils.StatusCode;
32 import org.opendaylight.ovsdb.lib.MonitorCallBack;
33 import org.opendaylight.ovsdb.lib.OvsdbClient;
34 import org.opendaylight.ovsdb.lib.OvsdbConnection;
35 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
36 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
37 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
38 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
39 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
40 import org.opendaylight.ovsdb.lib.message.TableUpdates;
41 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
42 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
43 import org.opendaylight.ovsdb.lib.schema.TableSchema;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import com.google.common.collect.Lists;
48
49
50 /**
51  * Represents the openflow plugin component in charge of programming the flows
52  * the flow programming and relay them to functional modules above SAL.
53  */
54 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbConnectionListener {
55     protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
56
57     // Properties that can be set in config.ini
58     private static final Integer defaultOvsdbPort = 6640;
59
60     private OvsdbConnection connectionLib;
61     private ConcurrentMap<String, Connection> ovsdbConnections = new ConcurrentHashMap<String, Connection>();
62     private List<ChannelHandler> handlers = null;
63     private InventoryServiceInternal inventoryServiceInternal;
64
65     public InventoryServiceInternal getInventoryServiceInternal() {
66         return inventoryServiceInternal;
67     }
68
69     public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
70         this.inventoryServiceInternal = inventoryServiceInternal;
71     }
72
73     public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
74         if (this.inventoryServiceInternal == inventoryServiceInternal) {
75             this.inventoryServiceInternal = null;
76         }
77     }
78
79     public void setOvsdbConnection(OvsdbConnection connectionService) {
80         connectionLib = connectionService;
81         // It is not correct to register the service here. Rather, we should depend on the
82         // Service created by createServiceDependency() and hook to it via Apache DM.
83         // Using this temporarily till the Service Dependency is resolved.
84         connectionLib.registerForPassiveConnection(this);
85     }
86
87     public void unsetOvsdbConnection(OvsdbConnection connectionService) {
88         connectionLib = null;
89     }
90
91     public void init() {
92     }
93
94     /**
95      * Function called by the dependency manager when at least one dependency
96      * become unsatisfied or when the component is shutting down because for
97      * example bundle is being stopped.
98      */
99     void destroy() {
100     }
101
102     /**
103      * Function called by dependency manager after "init ()" is called and after
104      * the services provided by the class are registered in the service registry
105      */
106     void start() {
107         Collection<OvsdbClient> connections = connectionLib.getConnections();
108         for (OvsdbClient client : connections) {
109             this.connected(client);
110         }
111     }
112
113     /**
114      * Function called by the dependency manager before the services exported by
115      * the component are unregistered, this will be followed by a "destroy ()"
116      * calls
117      */
118     void stopping() {
119         for (Connection connection : ovsdbConnections.values()) {
120             connection.disconnect();
121         }
122     }
123
124     @Override
125     public Status disconnect(Node node) {
126         String identifier = (String) node.getID();
127         Connection connection = ovsdbConnections.get(identifier);
128         if (connection != null) {
129             ovsdbConnections.remove(identifier);
130             connection.disconnect();
131             inventoryServiceInternal.removeNode(node);
132             return new Status(StatusCode.SUCCESS);
133         } else {
134             return new Status(StatusCode.NOTFOUND);
135         }
136     }
137
138     @Override
139     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
140         InetAddress address;
141         Integer port;
142
143         try {
144             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
145         } catch (Exception e) {
146             logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
147             return null;
148         }
149
150         try {
151             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
152             if (port == 0) port = defaultOvsdbPort;
153         } catch (Exception e) {
154             port = defaultOvsdbPort;
155         }
156
157         try {
158             OvsdbClient client = connectionLib.connect(address, port);
159             return handleNewConnection(identifier, client);
160         } catch (InterruptedException e) {
161             logger.error("Thread was interrupted during connect", e);
162         } catch (ExecutionException e) {
163             logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
164         }
165         return null;
166     }
167
168     public List<ChannelHandler> getHandlers() {
169         return handlers;
170     }
171
172     public void setHandlers(List<ChannelHandler> handlers) {
173         this.handlers = handlers;
174     }
175
176     @Override
177     public Connection getConnection(Node node) {
178         String identifier = (String) node.getID();
179         return ovsdbConnections.get(identifier);
180     }
181
182     @Override
183     public List<Node> getNodes() {
184         List<Node> nodes = new ArrayList<Node>();
185         for (Connection connection : ovsdbConnections.values()) {
186             nodes.add(connection.getNode());
187         }
188         return nodes;
189     }
190
191     @Override
192     public void notifyClusterViewChanged() {
193     }
194
195     @Override
196     public void notifyNodeDisconnectFromMaster(Node arg0) {
197     }
198
199     private Node handleNewConnection(String identifier, OvsdbClient client) throws InterruptedException, ExecutionException {
200         Connection connection = new Connection(identifier, client);
201         Node node = connection.getNode();
202         ovsdbConnections.put(identifier, connection);
203         List<String> dbs = client.getDatabases().get();
204         for (String db : dbs) {
205             client.getSchema(db).get();
206         }
207         // Keeping the Initial inventory update(s) on its own thread.
208         new Thread() {
209             Connection connection;
210             String identifier;
211
212             @Override
213             public void run() {
214                 try {
215                     initializeInventoryForNewNode(connection);
216                 } catch (InterruptedException | ExecutionException | IOException e) {
217                     logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
218                     ovsdbConnections.remove(identifier);
219                 }
220             }
221             public Thread initializeConnectionParams(String identifier, Connection connection) {
222                 this.identifier = identifier;
223                 this.connection = connection;
224                 return this;
225             }
226         }.initializeConnectionParams(identifier, connection).start();
227         return node;
228     }
229
230     public void channelClosed(Node node) throws Exception {
231         logger.info("Connection to Node : {} closed", node);
232         disconnect(node);
233         inventoryServiceInternal.removeNode(node);
234     }
235
236     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException, IOException {
237         OvsdbClient client = connection.getClient();
238         InetAddress address = client.getConnectionInfo().getRemoteAddress();
239         int port = client.getConnectionInfo().getRemotePort();
240         IPAddressProperty addressProp = new IPAddressProperty(address);
241         L4PortProperty l4Port = new L4PortProperty(port);
242         Set<Property> props = new HashSet<Property>();
243         props.add(addressProp);
244         props.add(l4Port);
245         inventoryServiceInternal.addNode(connection.getNode(), props);
246
247         List<String> databases = client.getDatabases().get();
248         if (databases == null) {
249             logger.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
250             return;
251         }
252         for (String database : databases) {
253             DatabaseSchema dbSchema = client.getSchema(database).get();
254             TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
255             inventoryServiceInternal.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
256         }
257         inventoryServiceInternal.notifyNodeAdded(connection.getNode());
258     }
259
260     public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
261         String identifier = (String) node.getID();
262         Connection connection = ovsdbConnections.get(identifier);
263         OvsdbClient client = connection.getClient();
264         if (dbSchema == null) {
265             logger.error("Unable to get Database Schema for the ovsdb connection : {}", client.getConnectionInfo());
266             return null;
267         }
268         Set<String> tables = dbSchema.getTables();
269         if (tables == null) {
270             logger.warn("Database {} without any tables. Strange !", dbSchema.getName());
271             return null;
272         }
273         List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
274         for (String tableName : tables) {
275             GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
276             monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
277         }
278         return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
279     }
280
281     /**
282      * As per RFC 7047, section 4.1.5, if a Monitor request is sent without any columns, the update response will not include
283      * the _uuid column.
284      * ----------------------------------------------------------------------------------------------------------------------------------
285      * Each <monitor-request> specifies one or more columns and the manner in which the columns (or the entire table) are to be monitored.
286      * The "columns" member specifies the columns whose values are monitored. It MUST NOT contain duplicates.
287      * If "columns" is omitted, all columns in the table, except for "_uuid", are monitored.
288      * ----------------------------------------------------------------------------------------------------------------------------------
289      * In order to overcome this limitation, this method
290      *
291      * @return MonitorRequest that includes all the Bridge Columns including _uuid
292      */
293     public <T extends TableSchema<T>> MonitorRequest<T> getAllColumnsMonitorRequest (T tableSchema) {
294         Set<String> columns = tableSchema.getColumns();
295         MonitorRequestBuilder<T> monitorBuilder = MonitorRequestBuilder.builder(tableSchema);
296         for (String column : columns) {
297             monitorBuilder.addColumn(column);
298         }
299         return monitorBuilder.with(new MonitorSelect(true, true, true, true)).build();
300     }
301
302     private class UpdateMonitor implements MonitorCallBack {
303         Node node = null;
304         public UpdateMonitor(Node node) {
305             this.node = node;
306         }
307
308         @Override
309         public void update(TableUpdates result, DatabaseSchema dbSchema) {
310             inventoryServiceInternal.processTableUpdates(node, dbSchema.getName(), result);
311         }
312
313         @Override
314         public void exception(Throwable t) {
315             System.out.println("Exception t = " + t);
316         }
317     }
318
319     private String getConnectionIdentifier(OvsdbClient client) {
320         OvsdbConnectionInfo info = client.getConnectionInfo();
321         return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
322     }
323
324
325     @Override
326     public void connected(OvsdbClient client) {
327         String identifier = getConnectionIdentifier(client);
328         try {
329             this.handleNewConnection(identifier, client);
330         } catch (InterruptedException | ExecutionException e) {
331             e.printStackTrace();
332         }
333     }
334
335     @Override
336     public void disconnected(OvsdbClient client) {
337         Connection connection = ovsdbConnections.get(this.getConnectionIdentifier(client));
338         if (connection == null) return;
339         this.disconnect(connection.getNode());
340     }
341 }