2 * Copyright (c) 2013, 2015 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.plugin.impl;
11 import io.netty.channel.ChannelHandler;
13 import java.io.IOException;
14 import java.net.InetAddress;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.ExecutionException;
24 import org.opendaylight.ovsdb.lib.MonitorCallBack;
25 import org.opendaylight.ovsdb.lib.OvsdbClient;
26 import org.opendaylight.ovsdb.lib.OvsdbConnection;
27 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
28 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
29 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
30 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
31 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
32 import org.opendaylight.ovsdb.lib.message.TableUpdates;
33 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
34 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
35 import org.opendaylight.ovsdb.lib.schema.TableSchema;
36 import org.opendaylight.ovsdb.plugin.api.Connection;
37 import org.opendaylight.ovsdb.plugin.api.ConnectionConstants;
38 import org.opendaylight.ovsdb.plugin.api.Status;
39 import org.opendaylight.ovsdb.plugin.api.StatusCode;
40 import org.opendaylight.ovsdb.plugin.api.OvsdbConnectionService;
41 import org.opendaylight.ovsdb.plugin.api.OvsdbInventoryService;
42 import org.opendaylight.ovsdb.utils.config.ConfigProperties;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import com.google.common.collect.Lists;
51 * Represents the openflow plugin component in charge of programming the flows
52 * the flow programming and relay them to functional modules above SAL.
54 public class ConnectionServiceImpl implements OvsdbConnectionService,
55 OvsdbConnectionListener {
56 private static final Logger LOG = LoggerFactory.getLogger(ConnectionServiceImpl.class);
58 // Properties that can be set in config.ini
59 private static final Integer DEFAULT_OVSDB_PORT = 6640;
60 private static final String OVSDB_LISTENPORT = "ovsdb.listenPort";
63 public void putOvsdbConnection (String identifier, Connection connection) {
64 ovsdbConnections.put(identifier, connection);
67 private ConcurrentMap<String, Connection> ovsdbConnections = new ConcurrentHashMap<String, Connection>();
68 private List<ChannelHandler> handlers = null;
70 private volatile OvsdbInventoryService ovsdbInventoryService;
71 private volatile OvsdbConnection connectionLib;
73 public void setOvsdbInventoryService(OvsdbInventoryService inventoryService) {
74 this.ovsdbInventoryService = inventoryService;
77 public void setOvsdbConnection(OvsdbConnection ovsdbConnection) {
78 this.connectionLib = ovsdbConnection;
85 * Function called by the dependency manager when at least one dependency
86 * become unsatisfied or when the component is shutting down because for
87 * example bundle is being stopped.
93 * Function called by dependency manager after "init ()" is called and after
94 * the services provided by the class are registered in the service registry
97 /* Start ovsdb server before getting connection clients */
98 String portString = ConfigProperties.getProperty(OvsdbConnectionService.class, OVSDB_LISTENPORT);
99 int ovsdbListenPort = DEFAULT_OVSDB_PORT;
100 if (portString != null) {
101 ovsdbListenPort = Integer.parseInt(portString);
104 if (!connectionLib.startOvsdbManager(ovsdbListenPort)) {
105 LOG.warn("Start OVSDB manager call from ConnectionService was not necessary");
108 /* Then get connection clients */
109 Collection<OvsdbClient> connections = connectionLib.getConnections();
110 for (OvsdbClient client : connections) {
111 LOG.info("CONNECT start connected clients client = {}", client);
112 this.connected(client);
117 * Function called by the dependency manager before the services exported by
118 * the component are unregistered, this will be followed by a "destroy ()"
122 for (Connection connection : ovsdbConnections.values()) {
123 connection.disconnect();
127 public Status disconnect(Node node) {
128 Connection connection = getConnection(node);
129 if (connection != null) {
130 ovsdbConnections.remove(normalizeId(node.getId().getValue()));
131 connection.disconnect();
132 ovsdbInventoryService.removeNode(node);
133 return new Status(StatusCode.SUCCESS);
135 return new Status(StatusCode.NOTFOUND);
139 public Node connect(String identifier, Map<ConnectionConstants, String> params) {
144 address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
145 } catch (Exception e) {
146 LOG.error("Unable to resolve {}", params.get(ConnectionConstants.ADDRESS), e);
151 port = Integer.parseInt(params.get(ConnectionConstants.PORT));
153 port = DEFAULT_OVSDB_PORT;
155 } catch (Exception e) {
156 port = DEFAULT_OVSDB_PORT;
160 OvsdbClient client = connectionLib.connect(address, port);
161 return handleNewConnection(identifier, client);
162 } catch (InterruptedException e) {
163 LOG.error("Thread was interrupted during connect", e);
164 } catch (ExecutionException e) {
165 LOG.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
170 public List<ChannelHandler> getHandlers() {
174 public void setHandlers(List<ChannelHandler> handlers) {
175 this.handlers = handlers;
178 private String normalizeId (String identifier) {
179 String id = identifier;
181 String[] pair = identifier.split("\\|");
182 if (pair[0].equals("OVS")) {
190 public Connection getConnection(Node node) {
191 return ovsdbConnections.get(normalizeId(node.getId().getValue()));
195 public Node getNode (String identifier) {
196 Connection connection = ovsdbConnections.get(normalizeId(identifier));
197 if (connection != null) {
198 return connection.getNode();
205 public List<Node> getNodes() {
206 List<Node> nodes = new ArrayList<>();
207 for (Connection connection : ovsdbConnections.values()) {
208 nodes.add(connection.getNode());
213 private Node handleNewConnection(String identifier, OvsdbClient client) throws InterruptedException, ExecutionException {
214 Connection connection = new Connection(identifier, client);
215 Node node = connection.getNode();
216 ovsdbConnections.put(identifier, connection);
217 List<String> dbs = client.getDatabases().get();
218 for (String db : dbs) {
219 client.getSchema(db).get();
221 // Keeping the Initial inventory update(s) on its own thread.
223 Connection connection;
229 LOG.info("Initialize inventory for {}", connection.toString());
230 initializeInventoryForNewNode(connection);
231 } catch (InterruptedException | ExecutionException | IOException e) {
232 LOG.error("Failed to initialize inventory for node with identifier {}", identifier, e);
233 ovsdbConnections.remove(identifier);
236 public Thread initializeConnectionParams(String identifier, Connection connection) {
237 this.identifier = identifier;
238 this.connection = connection;
241 }.initializeConnectionParams(identifier, connection).start();
245 public void channelClosed(Node node) throws Exception {
246 LOG.info("Connection to Node : {} closed", node);
248 ovsdbInventoryService.removeNode(node);
251 private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException, IOException {
252 OvsdbClient client = connection.getClient();
253 InetAddress address = client.getConnectionInfo().getRemoteAddress();
254 int port = client.getConnectionInfo().getRemotePort();
256 List<String> databases = client.getDatabases().get();
257 if (databases == null) {
258 LOG.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
261 for (String database : databases) {
262 DatabaseSchema dbSchema = client.getSchema(database).get();
263 TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
264 ovsdbInventoryService.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
266 LOG.info("Notifying Inventory Listeners for Node Added: {}", connection.getNode().toString());
267 ovsdbInventoryService.notifyNodeAdded(connection.getNode(), address, port);
270 public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
271 Connection connection = getConnection(node);
272 OvsdbClient client = connection.getClient();
273 if (dbSchema == null) {
274 LOG.error("Unable to get Database Schema for the ovsdb connection : {}", client.getConnectionInfo());
277 Set<String> tables = dbSchema.getTables();
278 if (tables == null) {
279 LOG.warn("Database {} without any tables. Strange !", dbSchema.getName());
282 List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
283 for (String tableName : tables) {
284 GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
285 monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
287 return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
291 * As per RFC 7047, section 4.1.5, if a Monitor request is sent without any columns, the update response will not include
293 * ----------------------------------------------------------------------------------------------------------------------------------
294 * Each <monitor-request> specifies one or more columns and the manner in which the columns (or the entire table) are to be monitored.
295 * The "columns" member specifies the columns whose values are monitored. It MUST NOT contain duplicates.
296 * If "columns" is omitted, all columns in the table, except for "_uuid", are monitored.
297 * ----------------------------------------------------------------------------------------------------------------------------------
298 * In order to overcome this limitation, this method
300 * @return MonitorRequest that includes all the Bridge Columns including _uuid
302 public <T extends TableSchema<T>> MonitorRequest<T> getAllColumnsMonitorRequest (T tableSchema) {
303 Set<String> columns = tableSchema.getColumns();
304 MonitorRequestBuilder<T> monitorBuilder = MonitorRequestBuilder.builder(tableSchema);
305 for (String column : columns) {
306 monitorBuilder.addColumn(column);
308 return monitorBuilder.with(new MonitorSelect(true, true, true, true)).build();
311 private class UpdateMonitor implements MonitorCallBack {
313 public UpdateMonitor(Node node) {
318 public void update(TableUpdates result, DatabaseSchema dbSchema) {
319 ovsdbInventoryService.processTableUpdates(node, dbSchema.getName(), result);
323 public void exception(Throwable t) {
324 System.out.println("Exception t = " + t);
328 private String getConnectionIdentifier(OvsdbClient client) {
329 OvsdbConnectionInfo info = client.getConnectionInfo();
330 return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
335 public void connected(OvsdbClient client) {
336 String identifier = getConnectionIdentifier(client);
338 this.handleNewConnection(identifier, client);
339 } catch (InterruptedException | ExecutionException e) {
345 public void disconnected(OvsdbClient client) {
346 Connection connection = ovsdbConnections.get(this.getConnectionIdentifier(client));
347 if (connection == null) {
350 this.disconnect(connection.getNode());