Merge "Southbound-IT: add test to verify dpdk enabled switch."
[netvirt.git] / plugin / src / main / java / org / opendaylight / ovsdb / plugin / impl / ConnectionServiceImpl.java
1 /*
2  * Copyright (C) 2013 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
9  */
10 package org.opendaylight.ovsdb.plugin.impl;
11
12 import io.netty.channel.ChannelHandler;
13
14 import java.io.IOException;
15 import java.net.InetAddress;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24
25 import org.opendaylight.ovsdb.lib.MonitorCallBack;
26 import org.opendaylight.ovsdb.lib.OvsdbClient;
27 import org.opendaylight.ovsdb.lib.OvsdbConnection;
28 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
29 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
30 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
31 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
32 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
33 import org.opendaylight.ovsdb.lib.message.TableUpdates;
34 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
35 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
36 import org.opendaylight.ovsdb.lib.schema.TableSchema;
37 import org.opendaylight.ovsdb.plugin.api.Connection;
38 import org.opendaylight.ovsdb.plugin.api.ConnectionConstants;
39 import org.opendaylight.ovsdb.plugin.api.Status;
40 import org.opendaylight.ovsdb.plugin.api.StatusCode;
41 import org.opendaylight.ovsdb.plugin.api.OvsdbConnectionService;
42 import org.opendaylight.ovsdb.plugin.api.OvsdbInventoryService;
43 import org.opendaylight.ovsdb.utils.config.ConfigProperties;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import com.google.common.collect.Lists;
49
50
51 /**
52  * Represents the openflow plugin component in charge of programming the flows
53  * the flow programming and relay them to functional modules above SAL.
54  */
55 public class ConnectionServiceImpl implements OvsdbConnectionService,
56                                               OvsdbConnectionListener {
57     protected static final Logger logger = LoggerFactory.getLogger(ConnectionServiceImpl.class);
58
59     // Properties that can be set in config.ini
60     private static final Integer DEFAULT_OVSDB_PORT = 6640;
61     private static final String OVSDB_LISTENPORT = "ovsdb.listenPort";
62
63
64     public void putOvsdbConnection (String identifier, Connection connection) {
65         ovsdbConnections.put(identifier, connection);
66     }
67
68     private ConcurrentMap<String, Connection> ovsdbConnections = new ConcurrentHashMap<String, Connection>();
69     private List<ChannelHandler> handlers = null;
70
71     private volatile OvsdbInventoryService ovsdbInventoryService;
72     private volatile OvsdbConnection connectionLib;
73
74     public void setOvsdbInventoryService(OvsdbInventoryService inventoryService) {
75         this.ovsdbInventoryService = inventoryService;
76     }
77
78     public void setOvsdbConnection(OvsdbConnection ovsdbConnection) {
79         this.connectionLib = ovsdbConnection;
80     }
81
82     public void init() {
83     }
84
85     /**
86      * Function called by the dependency manager when at least one dependency
87      * become unsatisfied or when the component is shutting down because for
88      * example bundle is being stopped.
89      */
90     void destroy() {
91     }
92
93     /**
94      * Function called by dependency manager after "init ()" is called and after
95      * the services provided by the class are registered in the service registry
96      */
97     void start() {
98         /* Start ovsdb server before getting connection clients */
99         String portString = ConfigProperties.getProperty(OvsdbConnectionService.class, OVSDB_LISTENPORT);
100         int ovsdbListenPort = DEFAULT_OVSDB_PORT;
101         if (portString != null) {
102             ovsdbListenPort = Integer.decode(portString).intValue();
103         }
104
105         if (!connectionLib.startOvsdbManager(ovsdbListenPort)) {
106             logger.warn("Start OVSDB manager call from ConnectionService was not necessary");
107         }
108
109         /* Then get connection clients */
110         Collection<OvsdbClient> connections = connectionLib.getConnections();
111         for (OvsdbClient client : connections) {
112             logger.info("CONNECT start connected clients client = {}", client);
113             this.connected(client);
114         }
115     }
116
117     /**
118      * Function called by the dependency manager before the services exported by
119      * the component are unregistered, this will be followed by a "destroy ()"
120      * calls
121      */
122     void stopping() {
123         for (Connection connection : ovsdbConnections.values()) {
124             connection.disconnect();
125         }
126     }
127
128     public Status disconnect(Node node) {
129         Connection connection = getConnection(node);
130         if (connection != null) {
131             ovsdbConnections.remove(normalizeId(node.getId().getValue()));
132             connection.disconnect();
133             ovsdbInventoryService.removeNode(node);
134             return new Status(StatusCode.SUCCESS);
135         } else {
136             return new Status(StatusCode.NOTFOUND);
137         }
138     }
139
140     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
141         InetAddress address;
142         Integer port;
143
144         try {
145             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
146         } catch (Exception e) {
147             logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
148             return null;
149         }
150
151         try {
152             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
153             if (port == 0) port = DEFAULT_OVSDB_PORT;
154         } catch (Exception e) {
155             port = DEFAULT_OVSDB_PORT;
156         }
157
158         try {
159             OvsdbClient client = connectionLib.connect(address, port);
160             return handleNewConnection(identifier, client);
161         } catch (InterruptedException e) {
162             logger.error("Thread was interrupted during connect", e);
163         } catch (ExecutionException e) {
164             logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
165         }
166         return null;
167     }
168
169     public List<ChannelHandler> getHandlers() {
170         return handlers;
171     }
172
173     public void setHandlers(List<ChannelHandler> handlers) {
174         this.handlers = handlers;
175     }
176
177     private String normalizeId (String identifier) {
178         String id = identifier;
179
180         String[] pair = identifier.split("\\|");
181         if (pair[0].equals("OVS")) {
182             id = pair[1];
183         }
184
185         return id;
186     }
187
188     @Override
189     public Connection getConnection(Node node) {
190         return ovsdbConnections.get(normalizeId(node.getId().getValue()));
191     }
192
193     @Override
194     public Node getNode (String identifier) {
195         Connection connection = ovsdbConnections.get(normalizeId(identifier));
196         if (connection != null) {
197             return connection.getNode();
198         } else {
199             return null;
200         }
201     }
202
203     @Override
204     public List<Node> getNodes() {
205         List<Node> nodes = new ArrayList<Node>();
206         for (Connection connection : ovsdbConnections.values()) {
207             nodes.add(connection.getNode());
208         }
209         return nodes;
210     }
211
212     private Node handleNewConnection(String identifier, OvsdbClient client) throws InterruptedException, ExecutionException {
213         Connection connection = new Connection(identifier, client);
214         Node node = connection.getNode();
215         ovsdbConnections.put(identifier, connection);
216         List<String> dbs = client.getDatabases().get();
217         for (String db : dbs) {
218             client.getSchema(db).get();
219         }
220         // Keeping the Initial inventory update(s) on its own thread.
221         new Thread() {
222             Connection connection;
223             String identifier;
224
225             @Override
226             public void run() {
227                 try {
228                     logger.info("Initialize inventory for {}", connection.toString());
229                     initializeInventoryForNewNode(connection);
230                 } catch (InterruptedException | ExecutionException | IOException e) {
231                     logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
232                     ovsdbConnections.remove(identifier);
233                 }
234             }
235             public Thread initializeConnectionParams(String identifier, Connection connection) {
236                 this.identifier = identifier;
237                 this.connection = connection;
238                 return this;
239             }
240         }.initializeConnectionParams(identifier, connection).start();
241         return node;
242     }
243
244     public void channelClosed(Node node) throws Exception {
245         logger.info("Connection to Node : {} closed", node);
246         disconnect(node);
247         ovsdbInventoryService.removeNode(node);
248     }
249
250     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException, IOException {
251         OvsdbClient client = connection.getClient();
252         InetAddress address = client.getConnectionInfo().getRemoteAddress();
253         int port = client.getConnectionInfo().getRemotePort();
254
255         List<String> databases = client.getDatabases().get();
256         if (databases == null) {
257             logger.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
258             return;
259         }
260         for (String database : databases) {
261             DatabaseSchema dbSchema = client.getSchema(database).get();
262             TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
263             ovsdbInventoryService.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
264         }
265         logger.info("Notifying Inventory Listeners for Node Added: {}", connection.getNode().toString());
266         ovsdbInventoryService.notifyNodeAdded(connection.getNode(), address, port);
267     }
268
269     public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
270         Connection connection = getConnection(node);
271         OvsdbClient client = connection.getClient();
272         if (dbSchema == null) {
273             logger.error("Unable to get Database Schema for the ovsdb connection : {}", client.getConnectionInfo());
274             return null;
275         }
276         Set<String> tables = dbSchema.getTables();
277         if (tables == null) {
278             logger.warn("Database {} without any tables. Strange !", dbSchema.getName());
279             return null;
280         }
281         List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
282         for (String tableName : tables) {
283             GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
284             monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
285         }
286         return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
287     }
288
289     /**
290      * As per RFC 7047, section 4.1.5, if a Monitor request is sent without any columns, the update response will not include
291      * the _uuid column.
292      * ----------------------------------------------------------------------------------------------------------------------------------
293      * Each &lt;monitor-request&gt; specifies one or more columns and the manner in which the columns (or the entire table) are to be monitored.
294      * The "columns" member specifies the columns whose values are monitored. It MUST NOT contain duplicates.
295      * If "columns" is omitted, all columns in the table, except for "_uuid", are monitored.
296      * ----------------------------------------------------------------------------------------------------------------------------------
297      * In order to overcome this limitation, this method
298      *
299      * @return MonitorRequest that includes all the Bridge Columns including _uuid
300      */
301     public <T extends TableSchema<T>> MonitorRequest<T> getAllColumnsMonitorRequest (T tableSchema) {
302         Set<String> columns = tableSchema.getColumns();
303         MonitorRequestBuilder<T> monitorBuilder = MonitorRequestBuilder.builder(tableSchema);
304         for (String column : columns) {
305             monitorBuilder.addColumn(column);
306         }
307         return monitorBuilder.with(new MonitorSelect(true, true, true, true)).build();
308     }
309
310     private class UpdateMonitor implements MonitorCallBack {
311         Node node = null;
312         public UpdateMonitor(Node node) {
313             this.node = node;
314         }
315
316         @Override
317         public void update(TableUpdates result, DatabaseSchema dbSchema) {
318             ovsdbInventoryService.processTableUpdates(node, dbSchema.getName(), result);
319         }
320
321         @Override
322         public void exception(Throwable t) {
323             System.out.println("Exception t = " + t);
324         }
325     }
326
327     private String getConnectionIdentifier(OvsdbClient client) {
328         OvsdbConnectionInfo info = client.getConnectionInfo();
329         return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
330     }
331
332
333     @Override
334     public void connected(OvsdbClient client) {
335         String identifier = getConnectionIdentifier(client);
336         try {
337             this.handleNewConnection(identifier, client);
338         } catch (InterruptedException | ExecutionException e) {
339             e.printStackTrace();
340         }
341     }
342
343     @Override
344     public void disconnected(OvsdbClient client) {
345         Connection connection = ovsdbConnections.get(this.getConnectionIdentifier(client));
346         if (connection == null) return;
347         this.disconnect(connection.getNode());
348     }
349 }