Simplifying the User facing APIs in TyperUtils.
[ovsdb.git] / library / src / main / java / org / opendaylight / ovsdb / lib / OvsDBClientImpl.java
1 /*
2  * Copyright (C) 2014 EBay Software Foundation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Ashwin Raveendran
9  */
10 package org.opendaylight.ovsdb.lib;
11
12 import java.util.HashMap;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Queue;
17 import java.util.UUID;
18 import java.util.concurrent.ExecutorService;
19
20 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
21 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
22 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
23 import org.opendaylight.ovsdb.lib.message.TableUpdate;
24 import org.opendaylight.ovsdb.lib.message.TableUpdates;
25 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
26 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
27 import org.opendaylight.ovsdb.lib.notation.Row;
28 import org.opendaylight.ovsdb.lib.operations.Operation;
29 import org.opendaylight.ovsdb.lib.operations.OperationResult;
30 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
31 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
32 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
33 import org.opendaylight.ovsdb.lib.schema.TableSchema;
34 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
35 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.fasterxml.jackson.databind.JsonNode;
40 import com.fasterxml.jackson.databind.node.ObjectNode;
41 import com.google.common.base.Function;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Maps;
45 import com.google.common.util.concurrent.FutureCallback;
46 import com.google.common.util.concurrent.Futures;
47 import com.google.common.util.concurrent.ListenableFuture;
48 import com.google.common.util.concurrent.SettableFuture;
49
50
51 public class OvsDBClientImpl implements OvsDBClient {
52
53     protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
54     private ExecutorService executorService;
55     private OvsdbRPC rpc;
56     private Map<String, DatabaseSchema> schema = Maps.newHashMap();
57     private HashMap<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
58     private Queue<Throwable> exceptions;
59     private OvsdbRPC.Callback rpcCallback;
60
61     public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
62         this.rpc = rpc;
63         this.executorService = executorService;
64     }
65
66     OvsDBClientImpl() {
67     }
68
69     void setupUpdateListener() {
70         if (rpcCallback == null) {
71             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
72                 @Override
73                 public void update(Object node, UpdateNotification upadateNotification) {
74                     Object key = upadateNotification.getContext();
75                     CallbackContext callbackContext = monitorCallbacks.get(key);
76                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
77                     if (monitorCallBack == null) {
78                         //ignore ?
79                         logger.info("callback received with context {}, but no known handler. Ignoring!", key);
80                         return;
81                     }
82                     _transformingCallback(upadateNotification.getUpdates(), monitorCallBack, callbackContext.schema);
83                 }
84
85                 @Override
86                 public void locked(Object node, List<String> ids) {
87
88                 }
89
90                 @Override
91                 public void stolen(Object node, List<String> ids) {
92
93                 }
94             };
95             this.rpcCallback = temp;
96             rpc.registerCallback(temp);
97         }
98     }
99
100
101     protected void _transformingCallback(JsonNode tableUpdatesJson, MonitorCallBack monitorCallBack, DatabaseSchema dbSchema) {
102         //todo(ashwin): we should move all the JSON parsing logic to a utility class
103         if (tableUpdatesJson instanceof ObjectNode) {
104             Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
105             ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
106             for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();){
107                 Map.Entry<String, JsonNode> entry = itr.next();
108
109                 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
110                 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
111                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
112
113             }
114             TableUpdates updates = new TableUpdates(tableUpdateMap);
115             monitorCallBack.update(updates);
116         }
117     }
118
119     @Override
120     public ListenableFuture<List<OperationResult>> transact(List<Operation> operations) {
121
122         //todo, we may not need transactionbuilder if we can have JSON objects
123         TransactBuilder builder = new TransactBuilder();
124         for (Operation o : operations) {
125             builder.addOperation(o);
126         }
127
128         return rpc.transact(builder);
129     }
130
131     @Override
132     public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
133                                                             List<MonitorRequest<E>> monitorRequest,
134                                                             final MonitorCallBack callback) {
135
136         final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
137                 new Function<MonitorRequest<E>, String>() {
138                     @Override
139                     public String apply(MonitorRequest<E> input) {
140                         return input.getTableName();
141                     }
142                 });
143
144         final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
145         registerCallback(monitorHandle, callback, dbSchema);
146
147         ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
148             @Override
149             public List<Object> params() {
150                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
151             }
152         });
153         Futures.addCallback(monitor, new FutureCallback<JsonNode>() {
154             @Override
155             public void onSuccess(JsonNode result) {
156                 _transformingCallback(result, callback, dbSchema);
157             }
158
159             @Override
160             public void onFailure(Throwable t) {
161                 callback.exception(t);
162             }
163         });
164
165         return monitorHandle;
166     }
167
168     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
169         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
170         setupUpdateListener();
171     }
172
173     @Override
174     public void cancelMonitor(MonitorHandle handler) {
175         throw new UnsupportedOperationException("not yet implemented");
176     }
177
178     @Override
179     public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
180         throw new UnsupportedOperationException("not yet implemented");
181     }
182
183     @Override
184     public ListenableFuture<Boolean> steal(String lockId) {
185         throw new UnsupportedOperationException("not yet implemented");
186     }
187
188     @Override
189     public ListenableFuture<Boolean> unLock(String lockId) {
190         throw new UnsupportedOperationException("not yet implemented");
191     }
192
193     @Override
194     public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
195         throw new UnsupportedOperationException("not yet implemented");
196     }
197
198     @Override
199     public void stopEchoService() {
200         throw new UnsupportedOperationException("not yet implemented");
201     }
202
203     @Override
204     public TransactionBuilder transactBuilder() {
205         return new TransactionBuilder(this);
206     }
207
208
209     public boolean isReady(int timeout) throws InterruptedException {
210         while (timeout > 0) {
211             if (!schema.isEmpty()) {
212                 return true;
213             }
214             Thread.sleep(1000);
215             timeout--;
216         }
217         return false;
218     }
219
220     @Override
221     public ListenableFuture<List<String>> getDatabases() {
222         return rpc.list_dbs();
223     }
224
225     @Override
226     public ListenableFuture<DatabaseSchema> getSchema(final String database, final boolean cacheResult) {
227
228         DatabaseSchema databaseSchema = schema.get(database);
229
230         if (databaseSchema == null || cacheResult) {
231             return Futures.transform(
232                     getSchemaFromDevice(Lists.newArrayList(database)),
233                     new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
234                         @Override
235                         public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
236                             if (result.containsKey(database)) {
237                                 DatabaseSchema s = result.get(database);
238                                 s.populateInternallyGeneratedColumns();
239                                 if (cacheResult) {
240                                     OvsDBClientImpl.this.schema.put(database, s);
241                                 }
242                                 return s;
243                             } else {
244                                 return null;
245                             }
246                         }
247                     }, executorService);
248
249
250         } else {
251             return Futures.immediateFuture(databaseSchema);
252         }
253     }
254
255     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
256         Map<String, DatabaseSchema> schema = Maps.newHashMap();
257         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
258         _populateSchema(dbNames, schema, future);
259         return future;
260     }
261
262     private void _populateSchema(final List<String> dbNames,
263                                  final Map<String, DatabaseSchema> schema,
264                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
265
266         if (dbNames == null || dbNames.isEmpty()) {
267             return;
268         }
269
270         Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
271                 new com.google.common.base.Function<JsonNode, Void>() {
272                     @Override
273                     public Void apply(JsonNode jsonNode) {
274                         try {
275                             schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
276                             if (schema.size() > 1 && !sfuture.isCancelled()) {
277                                 _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
278                             } else if (schema.size() == 1) {
279                                 sfuture.set(schema);
280                             }
281                         } catch (Throwable e) {
282                             sfuture.setException(e);
283                         }
284                         return null;
285                     }
286                 });
287     }
288
289     public void setRpc(OvsdbRPC rpc) {
290         this.rpc = rpc;
291     }
292
293     public Queue<Throwable> getExceptions() {
294         return exceptions;
295     }
296
297     static class CallbackContext {
298         MonitorCallBack monitorCallBack;
299         DatabaseSchema schema;
300
301         CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
302             this.monitorCallBack = monitorCallBack;
303             this.schema = schema;
304         }
305     }
306
307     @Override
308     public DatabaseSchema getDatabaseSchema (String dbName) {
309         return schema.get(dbName);
310     }
311
312     /**
313      * This method finds the DatabaseSchema that matches a given Typed Table Class.
314      * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
315      * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
316      *
317      * @param klazz Typed Class that represents a Table
318      * @return DatabaseSchema that matches a Typed Table Class
319      */
320     private <T> DatabaseSchema getDatabaseSchemaForTypedTable (Class <T> klazz) {
321         TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
322         if (typedTable != null) {
323             return this.getDatabaseSchema(typedTable.database());
324         }
325         return null;
326     }
327
328     /**
329      * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
330      * given the Typed Table Class
331      *
332      * @param klazz Typed Interface
333      * @return Proxy wrapper for the actual raw Row class.
334      */
335     @Override
336     public <T> T createTypedRowWrapper(Class<T> klazz) {
337         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
338         return this.createTypedRowWrapper(dbSchema, klazz);
339     }
340
341     /**
342      * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
343      * DatabaseSchema and Typed Table Class.
344      *
345      * @param dbSchema Database Schema of interest
346      * @param klazz Typed Interface
347      * @return Proxy wrapper for the actual raw Row class.
348      */
349     @Override
350     public <T> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
351         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<GenericTableSchema>());
352     }
353
354     /**
355      * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
356      *
357      * @param klazz Typed Interface
358      * @param row The actual Row that the wrapper is operating on. It can be null if the caller is just interested in getting ColumnSchema.
359      * @return Proxy wrapper for the actual raw Row class.
360      */
361     @Override
362     public <T> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
363         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
364         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
365     }
366 }