2 * Copyright (C) 2013 Red Hat, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
10 package org.opendaylight.ovsdb.plugin.impl;
12 import io.netty.channel.ChannelHandler;
14 import java.io.IOException;
15 import java.net.InetAddress;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
26 import org.opendaylight.controller.sal.core.Node;
27 import org.opendaylight.controller.sal.core.Property;
28 import org.opendaylight.ovsdb.lib.MonitorCallBack;
29 import org.opendaylight.ovsdb.lib.OvsdbClient;
30 import org.opendaylight.ovsdb.lib.OvsdbConnection;
31 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
32 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
33 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
34 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
35 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
36 import org.opendaylight.ovsdb.lib.message.TableUpdates;
37 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
38 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
39 import org.opendaylight.ovsdb.lib.schema.TableSchema;
40 import org.opendaylight.ovsdb.plugin.api.Connection;
41 import org.opendaylight.ovsdb.plugin.api.ConnectionConstants;
42 import org.opendaylight.ovsdb.plugin.api.Status;
43 import org.opendaylight.ovsdb.plugin.api.StatusCode;
44 import org.opendaylight.ovsdb.plugin.internal.IPAddressProperty;
45 import org.opendaylight.ovsdb.plugin.internal.L4PortProperty;
46 import org.opendaylight.ovsdb.plugin.api.OvsdbConnectionService;
47 import org.opendaylight.ovsdb.plugin.api.OvsdbInventoryService;
48 import org.opendaylight.ovsdb.utils.config.ConfigProperties;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 import com.google.common.collect.Lists;
56 * Represents the openflow plugin component in charge of programming the flows
57 * the flow programming and relay them to functional modules above SAL.
59 public class ConnectionServiceImpl implements OvsdbConnectionService,
60 OvsdbConnectionListener {
61 protected static final Logger logger = LoggerFactory.getLogger(ConnectionServiceImpl.class);
63 // Properties that can be set in config.ini
64 private static final Integer DEFAULT_OVSDB_PORT = 6640;
65 private static final String OVSDB_LISTENPORT = "ovsdb.listenPort";
68 public void putOvsdbConnection (String identifier, Connection connection) {
69 ovsdbConnections.put(identifier, connection);
72 private ConcurrentMap<String, Connection> ovsdbConnections = new ConcurrentHashMap<String, Connection>();
73 private List<ChannelHandler> handlers = null;
75 private volatile OvsdbInventoryService ovsdbInventoryService;
76 private volatile OvsdbConnection connectionLib;
78 public void setOvsdbInventoryService(OvsdbInventoryService inventoryService) {
79 this.ovsdbInventoryService = inventoryService;
82 public void setOvsdbConnection(OvsdbConnection ovsdbConnection) {
83 this.connectionLib = ovsdbConnection;
90 * Function called by the dependency manager when at least one dependency
91 * become unsatisfied or when the component is shutting down because for
92 * example bundle is being stopped.
98 * Function called by dependency manager after "init ()" is called and after
99 * the services provided by the class are registered in the service registry
102 /* Start ovsdb server before getting connection clients */
103 String portString = ConfigProperties.getProperty(OvsdbConnectionService.class, OVSDB_LISTENPORT);
104 int ovsdbListenPort = DEFAULT_OVSDB_PORT;
105 if (portString != null) {
106 ovsdbListenPort = Integer.decode(portString).intValue();
109 if (!connectionLib.startOvsdbManager(ovsdbListenPort)) {
110 logger.warn("Start OVSDB manager call from ConnectionService was not necessary");
113 /* Then get connection clients */
114 Collection<OvsdbClient> connections = connectionLib.getConnections();
115 for (OvsdbClient client : connections) {
116 logger.info("CONNECT start connected clients client = {}", client);
117 this.connected(client);
122 * Function called by the dependency manager before the services exported by
123 * the component are unregistered, this will be followed by a "destroy ()"
127 for (Connection connection : ovsdbConnections.values()) {
128 connection.disconnect();
132 public Status disconnect(Node node) {
133 String identifier = (String) node.getID();
134 Connection connection = ovsdbConnections.get(identifier);
135 if (connection != null) {
136 ovsdbConnections.remove(identifier);
137 connection.disconnect();
138 ovsdbInventoryService.removeNode(node);
139 return new Status(StatusCode.SUCCESS);
141 return new Status(StatusCode.NOTFOUND);
145 public Node connect(String identifier, Map<ConnectionConstants, String> params) {
150 address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
151 } catch (Exception e) {
152 logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
157 port = Integer.parseInt(params.get(ConnectionConstants.PORT));
158 if (port == 0) port = DEFAULT_OVSDB_PORT;
159 } catch (Exception e) {
160 port = DEFAULT_OVSDB_PORT;
164 OvsdbClient client = connectionLib.connect(address, port);
165 return handleNewConnection(identifier, client);
166 } catch (InterruptedException e) {
167 logger.error("Thread was interrupted during connect", e);
168 } catch (ExecutionException e) {
169 logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
174 public List<ChannelHandler> getHandlers() {
178 public void setHandlers(List<ChannelHandler> handlers) {
179 this.handlers = handlers;
183 public Connection getConnection(Node node) {
184 String identifier = (String) node.getID();
185 return ovsdbConnections.get(identifier);
189 public Node getNode (String identifier) {
190 String id = identifier;
192 String[] pair = identifier.split("\\|");
193 if (pair[0].equals("OVS")) {
197 Connection connection = ovsdbConnections.get(id);
198 if (connection != null) {
199 return connection.getNode();
206 public List<Node> getNodes() {
207 List<Node> nodes = new ArrayList<Node>();
208 for (Connection connection : ovsdbConnections.values()) {
209 nodes.add(connection.getNode());
214 private Node handleNewConnection(String identifier, OvsdbClient client) throws InterruptedException, ExecutionException {
215 Connection connection = new Connection(identifier, client);
216 Node node = connection.getNode();
217 ovsdbConnections.put(identifier, connection);
218 List<String> dbs = client.getDatabases().get();
219 for (String db : dbs) {
220 client.getSchema(db).get();
222 // Keeping the Initial inventory update(s) on its own thread.
224 Connection connection;
230 logger.info("Initialize inventory for {}", connection.toString());
231 initializeInventoryForNewNode(connection);
232 } catch (InterruptedException | ExecutionException | IOException e) {
233 logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
234 ovsdbConnections.remove(identifier);
237 public Thread initializeConnectionParams(String identifier, Connection connection) {
238 this.identifier = identifier;
239 this.connection = connection;
242 }.initializeConnectionParams(identifier, connection).start();
246 public void channelClosed(Node node) throws Exception {
247 logger.info("Connection to Node : {} closed", node);
249 ovsdbInventoryService.removeNode(node);
252 private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException, IOException {
253 OvsdbClient client = connection.getClient();
254 InetAddress address = client.getConnectionInfo().getRemoteAddress();
255 int port = client.getConnectionInfo().getRemotePort();
256 IPAddressProperty addressProp = new IPAddressProperty(address);
257 L4PortProperty l4Port = new L4PortProperty(port);
258 Set<Property> props = new HashSet<Property>();
259 props.add(addressProp);
261 logger.info("Add node to ovsdb inventory service {}", connection.getNode().toString());
262 ovsdbInventoryService.addNode(connection.getNode(), props);
264 List<String> databases = client.getDatabases().get();
265 if (databases == null) {
266 logger.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
269 for (String database : databases) {
270 DatabaseSchema dbSchema = client.getSchema(database).get();
271 TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
272 ovsdbInventoryService.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
274 logger.info("Notifying Inventory Listeners for Node Added: {}", connection.getNode().toString());
275 ovsdbInventoryService.notifyNodeAdded(connection.getNode(), address, port);
278 public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
279 String identifier = (String) node.getID();
280 Connection connection = ovsdbConnections.get(identifier);
281 OvsdbClient client = connection.getClient();
282 if (dbSchema == null) {
283 logger.error("Unable to get Database Schema for the ovsdb connection : {}", client.getConnectionInfo());
286 Set<String> tables = dbSchema.getTables();
287 if (tables == null) {
288 logger.warn("Database {} without any tables. Strange !", dbSchema.getName());
291 List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
292 for (String tableName : tables) {
293 GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
294 monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
296 return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
300 * As per RFC 7047, section 4.1.5, if a Monitor request is sent without any columns, the update response will not include
302 * ----------------------------------------------------------------------------------------------------------------------------------
303 * Each <monitor-request> specifies one or more columns and the manner in which the columns (or the entire table) are to be monitored.
304 * The "columns" member specifies the columns whose values are monitored. It MUST NOT contain duplicates.
305 * If "columns" is omitted, all columns in the table, except for "_uuid", are monitored.
306 * ----------------------------------------------------------------------------------------------------------------------------------
307 * In order to overcome this limitation, this method
309 * @return MonitorRequest that includes all the Bridge Columns including _uuid
311 public <T extends TableSchema<T>> MonitorRequest<T> getAllColumnsMonitorRequest (T tableSchema) {
312 Set<String> columns = tableSchema.getColumns();
313 MonitorRequestBuilder<T> monitorBuilder = MonitorRequestBuilder.builder(tableSchema);
314 for (String column : columns) {
315 monitorBuilder.addColumn(column);
317 return monitorBuilder.with(new MonitorSelect(true, true, true, true)).build();
320 private class UpdateMonitor implements MonitorCallBack {
322 public UpdateMonitor(Node node) {
327 public void update(TableUpdates result, DatabaseSchema dbSchema) {
328 ovsdbInventoryService.processTableUpdates(node, dbSchema.getName(), result);
332 public void exception(Throwable t) {
333 System.out.println("Exception t = " + t);
337 private String getConnectionIdentifier(OvsdbClient client) {
338 OvsdbConnectionInfo info = client.getConnectionInfo();
339 return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
344 public void connected(OvsdbClient client) {
345 String identifier = getConnectionIdentifier(client);
347 this.handleNewConnection(identifier, client);
348 } catch (InterruptedException | ExecutionException e) {
354 public void disconnected(OvsdbClient client) {
355 Connection connection = ovsdbConnections.get(this.getConnectionIdentifier(client));
356 if (connection == null) return;
357 this.disconnect(connection.getNode());