Modified the Library to be more friendlier towards the uuid value obtained through...
[ovsdb.git] / library / src / main / java / org / opendaylight / ovsdb / lib / OvsDBClientImpl.java
1 /*
2  * Copyright (C) 2014 EBay Software Foundation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Ashwin Raveendran
9  */
10 package org.opendaylight.ovsdb.lib;
11
12 import java.util.HashMap;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Queue;
17 import java.util.UUID;
18 import java.util.concurrent.ExecutorService;
19
20 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
21 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
22 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
23 import org.opendaylight.ovsdb.lib.message.TableUpdate;
24 import org.opendaylight.ovsdb.lib.message.TableUpdates;
25 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
26 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
27 import org.opendaylight.ovsdb.lib.operations.Operation;
28 import org.opendaylight.ovsdb.lib.operations.OperationResult;
29 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
30 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
31 import org.opendaylight.ovsdb.lib.schema.TableSchema;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import com.fasterxml.jackson.databind.JsonNode;
36 import com.fasterxml.jackson.databind.node.ObjectNode;
37 import com.google.common.base.Function;
38 import com.google.common.collect.ImmutableMap;
39 import com.google.common.collect.Lists;
40 import com.google.common.collect.Maps;
41 import com.google.common.util.concurrent.FutureCallback;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.ListenableFuture;
44 import com.google.common.util.concurrent.SettableFuture;
45
46
47 public class OvsDBClientImpl implements OvsDBClient {
48
49     protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
50     private ExecutorService executorService;
51     private OvsdbRPC rpc;
52     private Map<String, DatabaseSchema> schema = Maps.newHashMap();
53     private HashMap<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
54     private Queue<Throwable> exceptions;
55     private OvsdbRPC.Callback rpcCallback;
56
57     public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
58         this.rpc = rpc;
59         this.executorService = executorService;
60     }
61
62     OvsDBClientImpl() {
63     }
64
65     void setupUpdateListener() {
66         if (rpcCallback == null) {
67             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
68                 @Override
69                 public void update(Object node, UpdateNotification upadateNotification) {
70                     Object key = upadateNotification.getContext();
71                     CallbackContext callbackContext = monitorCallbacks.get(key);
72                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
73                     if (monitorCallBack == null) {
74                         //ignore ?
75                         logger.info("callback received with context {}, but no known handler. Ignoring!", key);
76                         return;
77                     }
78                     _transformingCallback(upadateNotification.getUpdates(), monitorCallBack, callbackContext.schema);
79                 }
80
81                 @Override
82                 public void locked(Object node, List<String> ids) {
83
84                 }
85
86                 @Override
87                 public void stolen(Object node, List<String> ids) {
88
89                 }
90             };
91             this.rpcCallback = temp;
92             rpc.registerCallback(temp);
93         }
94     }
95
96
97     protected void _transformingCallback(JsonNode tableUpdatesJson, MonitorCallBack monitorCallBack, DatabaseSchema dbSchema) {
98         //todo(ashwin): we should move all the JSON parsing logic to a utility class
99         if (tableUpdatesJson instanceof ObjectNode) {
100             Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
101             ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
102             for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();){
103                 Map.Entry<String, JsonNode> entry = itr.next();
104
105                 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
106                 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
107                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
108
109             }
110             TableUpdates updates = new TableUpdates(tableUpdateMap);
111             monitorCallBack.update(updates);
112         }
113     }
114
115     @Override
116     public ListenableFuture<List<OperationResult>> transact(List<Operation> operations) {
117
118         //todo, we may not need transactionbuilder if we can have JSON objects
119         TransactBuilder builder = new TransactBuilder();
120         for (Operation o : operations) {
121             builder.addOperation(o);
122         }
123
124         return rpc.transact(builder);
125     }
126
127     @Override
128     public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
129                                                             List<MonitorRequest<E>> monitorRequest,
130                                                             final MonitorCallBack callback) {
131
132         final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
133                 new Function<MonitorRequest<E>, String>() {
134                     @Override
135                     public String apply(MonitorRequest<E> input) {
136                         return input.getTableName();
137                     }
138                 });
139
140         final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
141         registerCallback(monitorHandle, callback, dbSchema);
142
143         ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
144             @Override
145             public List<Object> params() {
146                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
147             }
148         });
149         Futures.addCallback(monitor, new FutureCallback<JsonNode>() {
150             @Override
151             public void onSuccess(JsonNode result) {
152                 _transformingCallback(result, callback, dbSchema);
153             }
154
155             @Override
156             public void onFailure(Throwable t) {
157                 callback.exception(t);
158             }
159         });
160
161         return monitorHandle;
162     }
163
164     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
165         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
166         setupUpdateListener();
167     }
168
169     @Override
170     public void cancelMonitor(MonitorHandle handler) {
171         throw new UnsupportedOperationException("not yet implemented");
172     }
173
174     @Override
175     public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
176         throw new UnsupportedOperationException("not yet implemented");
177     }
178
179     @Override
180     public ListenableFuture<Boolean> steal(String lockId) {
181         throw new UnsupportedOperationException("not yet implemented");
182     }
183
184     @Override
185     public ListenableFuture<Boolean> unLock(String lockId) {
186         throw new UnsupportedOperationException("not yet implemented");
187     }
188
189     @Override
190     public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
191         throw new UnsupportedOperationException("not yet implemented");
192     }
193
194     @Override
195     public void stopEchoService() {
196         throw new UnsupportedOperationException("not yet implemented");
197     }
198
199     @Override
200     public TransactionBuilder transactBuilder() {
201         return new TransactionBuilder(this);
202     }
203
204
205     public boolean isReady(int timeout) throws InterruptedException {
206         while (timeout > 0) {
207             if (!schema.isEmpty()) {
208                 return true;
209             }
210             Thread.sleep(1000);
211             timeout--;
212         }
213         return false;
214     }
215
216     @Override
217     public ListenableFuture<List<String>> getDatabases() {
218         return rpc.list_dbs();
219     }
220
221     @Override
222     public ListenableFuture<DatabaseSchema> getSchema(final String database, final boolean cacheResult) {
223
224         DatabaseSchema databaseSchema = schema.get(database);
225
226         if (databaseSchema == null || cacheResult) {
227             return Futures.transform(
228                     getSchemaFromDevice(Lists.newArrayList(database)),
229                     new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
230                         @Override
231                         public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
232                             if (result.containsKey(database)) {
233                                 DatabaseSchema s = result.get(database);
234                                 s.populateInternallyGeneratedColumns();
235                                 if (cacheResult) {
236                                     OvsDBClientImpl.this.schema.put(database, s);
237                                 }
238                                 return s;
239                             } else {
240                                 return null;
241                             }
242                         }
243                     }, executorService);
244
245
246         } else {
247             return Futures.immediateFuture(databaseSchema);
248         }
249     }
250
251     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
252         Map<String, DatabaseSchema> schema = Maps.newHashMap();
253         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
254         _populateSchema(dbNames, schema, future);
255         return future;
256     }
257
258     private void _populateSchema(final List<String> dbNames,
259                                  final Map<String, DatabaseSchema> schema,
260                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
261
262         if (dbNames == null || dbNames.isEmpty()) {
263             return;
264         }
265
266         Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
267                 new com.google.common.base.Function<JsonNode, Void>() {
268                     @Override
269                     public Void apply(JsonNode jsonNode) {
270                         try {
271                             schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
272                             if (schema.size() > 1 && !sfuture.isCancelled()) {
273                                 _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
274                             } else if (schema.size() == 1) {
275                                 sfuture.set(schema);
276                             }
277                         } catch (Throwable e) {
278                             sfuture.setException(e);
279                         }
280                         return null;
281                     }
282                 });
283     }
284
285     public void setRpc(OvsdbRPC rpc) {
286         this.rpc = rpc;
287     }
288
289     public Queue<Throwable> getExceptions() {
290         return exceptions;
291     }
292
293     static class CallbackContext {
294         MonitorCallBack monitorCallBack;
295         DatabaseSchema schema;
296
297         CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
298             this.monitorCallBack = monitorCallBack;
299             this.schema = schema;
300         }
301     }
302 }