Merge "L3: Add eth to br-ex"
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbClientImpl.java
1 /*
2  * Copyright (c) 2014, 2015 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.lib.impl;
10
11 import io.netty.channel.Channel;
12
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.UUID;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorService;
19
20 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
21 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
22 import org.opendaylight.ovsdb.lib.LockStolenCallback;
23 import org.opendaylight.ovsdb.lib.MonitorCallBack;
24 import org.opendaylight.ovsdb.lib.MonitorHandle;
25 import org.opendaylight.ovsdb.lib.OvsdbClient;
26 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
27 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
28 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
29 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
30 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
31 import org.opendaylight.ovsdb.lib.message.TableUpdate;
32 import org.opendaylight.ovsdb.lib.message.TableUpdates;
33 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
34 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
35 import org.opendaylight.ovsdb.lib.notation.Row;
36 import org.opendaylight.ovsdb.lib.operations.Operation;
37 import org.opendaylight.ovsdb.lib.operations.OperationResult;
38 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
39 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
40 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
41 import org.opendaylight.ovsdb.lib.schema.TableSchema;
42 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
43 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
44 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import com.fasterxml.jackson.databind.JsonNode;
49 import com.fasterxml.jackson.databind.node.ObjectNode;
50 import com.google.common.base.Function;
51 import com.google.common.collect.ImmutableMap;
52 import com.google.common.collect.Lists;
53 import com.google.common.collect.Maps;
54 import com.google.common.util.concurrent.Futures;
55 import com.google.common.util.concurrent.ListenableFuture;
56 import com.google.common.util.concurrent.SettableFuture;
57
58
59 public class OvsdbClientImpl implements OvsdbClient {
60
61     private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
62     private ExecutorService executorService;
63     private OvsdbRPC rpc;
64     private Map<String, DatabaseSchema> schema = Maps.newHashMap();
65     private Map<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
66     private OvsdbRPC.Callback rpcCallback;
67     private OvsdbConnectionInfo connectionInfo;
68     private Channel channel;
69
70     public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type, ExecutorService executorService) {
71         this.rpc = rpc;
72         this.executorService = executorService;
73         this.channel = channel;
74
75         this.connectionInfo = new OvsdbConnectionInfo(channel, type);
76     }
77
78     OvsdbClientImpl() {
79     }
80
81     void setupUpdateListener() {
82         if (rpcCallback == null) {
83             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
84                 @Override
85                 public void update(Object node, UpdateNotification updateNotification) {
86                     Object key = updateNotification.getContext();
87                     CallbackContext callbackContext = monitorCallbacks.get(key);
88                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
89                     if (monitorCallBack == null) {
90                         //ignore ?
91                         LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
92                         return;
93                     }
94                     TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
95                             callbackContext.schema);
96                     monitorCallBack.update(updates, callbackContext.schema);
97                 }
98
99                 @Override
100                 public void locked(Object node, List<String> ids) {
101
102                 }
103
104                 @Override
105                 public void stolen(Object node, List<String> ids) {
106
107                 }
108             };
109             this.rpcCallback = temp;
110             rpc.registerCallback(temp);
111         }
112     }
113
114
115     protected TableUpdates transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
116         //todo(ashwin): we should move all the JSON parsing logic to a utility class
117         if (tableUpdatesJson instanceof ObjectNode) {
118             Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
119             ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
120             for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
121                 Map.Entry<String, JsonNode> entry = itr.next();
122
123                 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
124                 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
125                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
126
127             }
128             return new TableUpdates(tableUpdateMap);
129         }
130         return null;
131     }
132
133     @Override
134     public ListenableFuture<List<OperationResult>> transact(DatabaseSchema dbSchema, List<Operation> operations) {
135
136         //todo, we may not need transactionbuilder if we can have JSON objects
137         TransactBuilder builder = new TransactBuilder(dbSchema);
138         for (Operation operation : operations) {
139             builder.addOperation(operation);
140         }
141
142         return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
143     }
144
145     @Override
146     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
147                                                             List<MonitorRequest> monitorRequest,
148                                                             final MonitorCallBack callback) {
149
150         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
151                 new Function<MonitorRequest, String>() {
152                     @Override
153                     public String apply(MonitorRequest input) {
154                         return input.getTableName();
155                     }
156                 });
157
158         final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
159         registerCallback(monitorHandle, callback, dbSchema);
160
161         ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
162             @Override
163             public List<Object> params() {
164                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
165             }
166         });
167         JsonNode result;
168         try {
169             result = monitor.get();
170         } catch (InterruptedException | ExecutionException e) {
171             return null;
172         }
173         return transformingCallback(result, dbSchema);
174     }
175
176     @Override
177     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
178                                                            List<MonitorRequest> monitorRequest,
179                                                            final MonitorHandle monitorHandle,
180                                                            final MonitorCallBack callback) {
181
182         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
183                 new Function<MonitorRequest, String>() {
184                     @Override
185                     public String apply(MonitorRequest input) {
186                         return input.getTableName();
187                     }
188                 });
189
190         registerCallback(monitorHandle, callback, dbSchema);
191
192         ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
193             @Override
194             public List<Object> params() {
195                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
196             }
197         });
198         JsonNode result;
199         try {
200             result = monitor.get();
201         } catch (InterruptedException | ExecutionException e) {
202             return null;
203         }
204         return transformingCallback(result, dbSchema);
205     }
206
207     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
208         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
209         setupUpdateListener();
210     }
211
212     @Override
213     public void cancelMonitor(final MonitorHandle handler) {
214         ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(new Params() {
215             @Override
216             public List<Object> params() {
217                 return Lists.<Object>newArrayList(handler.getId());
218             }
219         });
220
221         JsonNode result = null;
222         try {
223             result = cancelMonitor.get();
224         } catch (InterruptedException | ExecutionException e) {
225             LOG.error("Exception when canceling monitor handler {}", handler.getId());
226         }
227
228         if (result == null) {
229             LOG.error("Fail to cancel monitor with handler {}", handler.getId());
230         } else {
231             LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
232         }
233     }
234
235     @Override
236     public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
237         throw new UnsupportedOperationException("not yet implemented");
238     }
239
240     @Override
241     public ListenableFuture<Boolean> steal(String lockId) {
242         throw new UnsupportedOperationException("not yet implemented");
243     }
244
245     @Override
246     public ListenableFuture<Boolean> unLock(String lockId) {
247         throw new UnsupportedOperationException("not yet implemented");
248     }
249
250     @Override
251     public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
252         throw new UnsupportedOperationException("not yet implemented");
253     }
254
255     @Override
256     public void stopEchoService() {
257         throw new UnsupportedOperationException("not yet implemented");
258     }
259
260     @Override
261     public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) {
262         return new TransactionBuilder(this, dbSchema);
263     }
264
265
266     public boolean isReady(int timeout) throws InterruptedException {
267         while (timeout > 0) {
268             if (!schema.isEmpty()) {
269                 return true;
270             }
271             Thread.sleep(1000);
272             timeout--;
273         }
274         return false;
275     }
276
277     @Override
278     public ListenableFuture<List<String>> getDatabases() {
279         return rpc.list_dbs();
280     }
281
282     @Override
283     public ListenableFuture<DatabaseSchema> getSchema(final String database) {
284
285         DatabaseSchema databaseSchema = schema.get(database);
286
287         if (databaseSchema == null) {
288             return Futures.transform(
289                     getSchemaFromDevice(Lists.newArrayList(database)),
290                     new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
291                         @Override
292                         public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
293                             if (result.containsKey(database)) {
294                                 DatabaseSchema dbSchema = result.get(database);
295                                 dbSchema.populateInternallyGeneratedColumns();
296                                 OvsdbClientImpl.this.schema.put(database, dbSchema);
297                                 return dbSchema;
298                             } else {
299                                 return null;
300                             }
301                         }
302                     }, executorService);
303         } else {
304             return Futures.immediateFuture(databaseSchema);
305         }
306     }
307
308     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
309         Map<String, DatabaseSchema> schema = Maps.newHashMap();
310         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
311         populateSchema(dbNames, schema, future);
312         return future;
313     }
314
315     private void populateSchema(final List<String> dbNames,
316                                  final Map<String, DatabaseSchema> schema,
317                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
318
319         if (dbNames == null || dbNames.isEmpty()) {
320             return;
321         }
322
323         Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
324                 new com.google.common.base.Function<JsonNode, Void>() {
325                     @Override
326                     public Void apply(JsonNode jsonNode) {
327                         try {
328                             schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
329                             if (schema.size() > 1 && !sfuture.isCancelled()) {
330                                 populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
331                             } else if (schema.size() == 1) {
332                                 sfuture.set(schema);
333                             }
334                         } catch (Exception e) {
335                             sfuture.setException(e);
336                         }
337                         return null;
338                     }
339                 });
340     }
341
342     public void setRpc(OvsdbRPC rpc) {
343         this.rpc = rpc;
344     }
345
346     static class CallbackContext {
347         MonitorCallBack monitorCallBack;
348         DatabaseSchema schema;
349
350         CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
351             this.monitorCallBack = monitorCallBack;
352             this.schema = schema;
353         }
354     }
355
356     @Override
357     public DatabaseSchema getDatabaseSchema(String dbName) {
358         return schema.get(dbName);
359     }
360
361     /**
362      * This method finds the DatabaseSchema that matches a given Typed Table Class.
363      * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
364      * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
365      *
366      * @param klazz Typed Class that represents a Table
367      * @return DatabaseSchema that matches a Typed Table Class
368      */
369     private <T> DatabaseSchema getDatabaseSchemaForTypedTable(Class<T> klazz) {
370         TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
371         if (typedTable != null) {
372             return this.getDatabaseSchema(typedTable.database());
373         }
374         return null;
375     }
376
377     /**
378      * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
379      * given the Typed Table Class
380      *
381      * @param klazz Typed Interface
382      * @return Proxy wrapper for the actual raw Row class.
383      */
384     @Override
385     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(Class<T> klazz) {
386         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
387         return this.createTypedRowWrapper(dbSchema, klazz);
388     }
389
390     /**
391      * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
392      * DatabaseSchema and Typed Table Class.
393      *
394      * @param dbSchema Database Schema of interest
395      * @param klazz Typed Interface
396      * @return Proxy wrapper for the actual raw Row class.
397      */
398     @Override
399     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
400         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<GenericTableSchema>());
401     }
402
403     /**
404      * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
405      *
406      * @param klazz Typed Interface
407      * @param row The actual Row that the wrapper is operating on.
408      *            It can be null if the caller is just interested in getting ColumnSchema.
409      * @return Proxy wrapper for the actual raw Row class.
410      */
411     @Override
412
413     public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
414         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
415         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
416     }
417
418     @Override
419     public OvsdbConnectionInfo getConnectionInfo() {
420         return connectionInfo;
421     }
422
423     @Override
424     public boolean isActive() {
425         return channel.isActive();
426     }
427
428     @Override
429     public void disconnect() {
430         channel.disconnect();
431     }
432 }