2 * Copyright (C) 2013 Red Hat, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
10 package org.opendaylight.ovsdb.plugin;
12 import io.netty.channel.ChannelHandler;
14 import java.io.IOException;
15 import java.net.InetAddress;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
26 import org.opendaylight.controller.sal.connection.ConnectionConstants;
27 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
28 import org.opendaylight.controller.sal.core.Node;
29 import org.opendaylight.controller.sal.core.Property;
30 import org.opendaylight.controller.sal.utils.Status;
31 import org.opendaylight.controller.sal.utils.StatusCode;
32 import org.opendaylight.ovsdb.lib.MonitorCallBack;
33 import org.opendaylight.ovsdb.lib.OvsdbClient;
34 import org.opendaylight.ovsdb.lib.OvsdbConnection;
35 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
36 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
37 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
38 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
39 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
40 import org.opendaylight.ovsdb.lib.message.TableUpdates;
41 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
42 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
43 import org.opendaylight.ovsdb.lib.schema.TableSchema;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import com.google.common.collect.Lists;
51 * Represents the openflow plugin component in charge of programming the flows
52 * the flow programming and relay them to functional modules above SAL.
54 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbConnectionListener {
55 protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
57 // Properties that can be set in config.ini
58 private static final Integer defaultOvsdbPort = 6640;
60 private OvsdbConnection connectionLib;
61 private ConcurrentMap<String, Connection> ovsdbConnections = new ConcurrentHashMap<String, Connection>();
62 private List<ChannelHandler> handlers = null;
63 private InventoryServiceInternal inventoryServiceInternal;
65 public InventoryServiceInternal getInventoryServiceInternal() {
66 return inventoryServiceInternal;
69 public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
70 this.inventoryServiceInternal = inventoryServiceInternal;
73 public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
74 if (this.inventoryServiceInternal == inventoryServiceInternal) {
75 this.inventoryServiceInternal = null;
79 public void setOvsdbConnection(OvsdbConnection connectionService) {
80 connectionLib = connectionService;
81 // It is not correct to register the service here. Rather, we should depend on the
82 // Service created by createServiceDependency() and hook to it via Apache DM.
83 // Using this temporarily till the Service Dependency is resolved.
84 connectionLib.registerForPassiveConnection(this);
87 public void unsetOvsdbConnection(OvsdbConnection connectionService) {
95 * Function called by the dependency manager when at least one dependency
96 * become unsatisfied or when the component is shutting down because for
97 * example bundle is being stopped.
103 * Function called by dependency manager after "init ()" is called and after
104 * the services provided by the class are registered in the service registry
107 Collection<OvsdbClient> connections = connectionLib.getConnections();
108 for (OvsdbClient client : connections) {
109 this.connected(client);
114 * Function called by the dependency manager before the services exported by
115 * the component are unregistered, this will be followed by a "destroy ()"
119 for (Connection connection : ovsdbConnections.values()) {
120 connection.disconnect();
125 public Status disconnect(Node node) {
126 String identifier = (String) node.getID();
127 Connection connection = ovsdbConnections.get(identifier);
128 if (connection != null) {
129 ovsdbConnections.remove(identifier);
130 connection.disconnect();
131 inventoryServiceInternal.removeNode(node);
132 return new Status(StatusCode.SUCCESS);
134 return new Status(StatusCode.NOTFOUND);
139 public Node connect(String identifier, Map<ConnectionConstants, String> params) {
144 address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
145 } catch (Exception e) {
146 logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
151 port = Integer.parseInt(params.get(ConnectionConstants.PORT));
152 if (port == 0) port = defaultOvsdbPort;
153 } catch (Exception e) {
154 port = defaultOvsdbPort;
158 OvsdbClient client = connectionLib.connect(address, port);
159 return handleNewConnection(identifier, client);
160 } catch (InterruptedException e) {
161 logger.error("Thread was interrupted during connect", e);
162 } catch (ExecutionException e) {
163 logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
168 public List<ChannelHandler> getHandlers() {
172 public void setHandlers(List<ChannelHandler> handlers) {
173 this.handlers = handlers;
177 public Connection getConnection(Node node) {
178 String identifier = (String) node.getID();
179 return ovsdbConnections.get(identifier);
183 public List<Node> getNodes() {
184 List<Node> nodes = new ArrayList<Node>();
185 for (Connection connection : ovsdbConnections.values()) {
186 nodes.add(connection.getNode());
192 public void notifyClusterViewChanged() {
196 public void notifyNodeDisconnectFromMaster(Node arg0) {
199 private Node handleNewConnection(String identifier, OvsdbClient client) throws InterruptedException, ExecutionException {
200 Connection connection = new Connection(identifier, client);
201 Node node = connection.getNode();
202 ovsdbConnections.put(identifier, connection);
203 List<String> dbs = client.getDatabases().get();
204 for (String db : dbs) {
205 client.getSchema(db).get();
207 // Keeping the Initial inventory update(s) on its own thread.
209 Connection connection;
215 initializeInventoryForNewNode(connection);
216 } catch (InterruptedException | ExecutionException | IOException e) {
217 logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
218 ovsdbConnections.remove(identifier);
221 public Thread initializeConnectionParams(String identifier, Connection connection) {
222 this.identifier = identifier;
223 this.connection = connection;
226 }.initializeConnectionParams(identifier, connection).start();
230 public void channelClosed(Node node) throws Exception {
231 logger.info("Connection to Node : {} closed", node);
233 inventoryServiceInternal.removeNode(node);
236 private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException, IOException {
237 OvsdbClient client = connection.getClient();
238 InetAddress address = client.getConnectionInfo().getRemoteAddress();
239 int port = client.getConnectionInfo().getRemotePort();
240 IPAddressProperty addressProp = new IPAddressProperty(address);
241 L4PortProperty l4Port = new L4PortProperty(port);
242 Set<Property> props = new HashSet<Property>();
243 props.add(addressProp);
245 inventoryServiceInternal.addNode(connection.getNode(), props);
247 List<String> databases = client.getDatabases().get();
248 if (databases == null) {
249 logger.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
252 for (String database : databases) {
253 DatabaseSchema dbSchema = client.getSchema(database).get();
254 TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
255 inventoryServiceInternal.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
257 inventoryServiceInternal.notifyNodeAdded(connection.getNode());
260 public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
261 String identifier = (String) node.getID();
262 Connection connection = ovsdbConnections.get(identifier);
263 OvsdbClient client = connection.getClient();
264 if (dbSchema == null) {
265 logger.error("Unable to get Database Schema for the ovsdb connection : {}", client.getConnectionInfo());
268 Set<String> tables = dbSchema.getTables();
269 if (tables == null) {
270 logger.warn("Database {} without any tables. Strange !", dbSchema.getName());
273 List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
274 for (String tableName : tables) {
275 GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
276 monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
278 return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
282 * As per RFC 7047, section 4.1.5, if a Monitor request is sent without any columns, the update response will not include
284 * ----------------------------------------------------------------------------------------------------------------------------------
285 * Each <monitor-request> specifies one or more columns and the manner in which the columns (or the entire table) are to be monitored.
286 * The "columns" member specifies the columns whose values are monitored. It MUST NOT contain duplicates.
287 * If "columns" is omitted, all columns in the table, except for "_uuid", are monitored.
288 * ----------------------------------------------------------------------------------------------------------------------------------
289 * In order to overcome this limitation, this method
291 * @return MonitorRequest that includes all the Bridge Columns including _uuid
293 public <T extends TableSchema<T>> MonitorRequest<T> getAllColumnsMonitorRequest (T tableSchema) {
294 Set<String> columns = tableSchema.getColumns();
295 MonitorRequestBuilder<T> monitorBuilder = MonitorRequestBuilder.builder(tableSchema);
296 for (String column : columns) {
297 monitorBuilder.addColumn(column);
299 return monitorBuilder.with(new MonitorSelect(true, true, true, true)).build();
302 private class UpdateMonitor implements MonitorCallBack {
304 public UpdateMonitor(Node node) {
309 public void update(TableUpdates result, DatabaseSchema dbSchema) {
310 inventoryServiceInternal.processTableUpdates(node, dbSchema.getName(), result);
314 public void exception(Throwable t) {
315 System.out.println("Exception t = " + t);
319 private String getConnectionIdentifier(OvsdbClient client) {
320 OvsdbConnectionInfo info = client.getConnectionInfo();
321 return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
326 public void connected(OvsdbClient client) {
327 String identifier = getConnectionIdentifier(client);
329 this.handleNewConnection(identifier, client);
330 } catch (InterruptedException | ExecutionException e) {
336 public void disconnected(OvsdbClient client) {
337 Connection connection = ovsdbConnections.get(this.getConnectionIdentifier(client));
338 if (connection == null) return;
339 this.disconnect(connection.getNode());