Merge "explicitly parsing JsonNode for updates" into topic/schema
[ovsdb.git] / library / src / main / java / org / opendaylight / ovsdb / lib / OvsDBClientImpl.java
1 /*
2  * Copyright (C) 2014 EBay Software Foundation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Ashwin Raveendran
9  */
10 package org.opendaylight.ovsdb.lib;
11
12 import com.fasterxml.jackson.databind.JsonNode;
13 import com.fasterxml.jackson.databind.node.ObjectNode;
14 import com.google.common.base.Function;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.collect.Lists;
17 import com.google.common.collect.Maps;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
23 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
24 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
25 import org.opendaylight.ovsdb.lib.message.TableUpdate;
26 import org.opendaylight.ovsdb.lib.message.TableUpdates;
27 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
28 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
29 import org.opendaylight.ovsdb.lib.operations.Operation;
30 import org.opendaylight.ovsdb.lib.operations.OperationResult;
31 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
32 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
33 import org.opendaylight.ovsdb.lib.schema.TableSchema;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import java.util.HashMap;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Queue;
42 import java.util.UUID;
43 import java.util.concurrent.ExecutorService;
44
45
46 public class OvsDBClientImpl implements OvsDBClient {
47
48     protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
49     private ExecutorService executorService;
50     private OvsdbRPC rpc;
51     private Map<String, DatabaseSchema> schema = Maps.newHashMap();
52     private HashMap<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
53     private Queue<Throwable> exceptions;
54     private OvsdbRPC.Callback rpcCallback;
55
56     public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
57         this.rpc = rpc;
58         this.executorService = executorService;
59     }
60
61     OvsDBClientImpl() {
62     }
63
64     void setupUpdateListener() {
65         if (rpcCallback == null) {
66             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
67                 @Override
68                 public void update(Object node, UpdateNotification upadateNotification) {
69                     Object key = upadateNotification.getContext();
70                     CallbackContext callbackContext = monitorCallbacks.get(key);
71                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
72                     if (monitorCallBack == null) {
73                         //ignore ?
74                         logger.info("callback received with context {}, but no known handler. Ignoring!", key);
75                         return;
76                     }
77                     _transformingCallback(upadateNotification.getUpdates(), monitorCallBack, callbackContext.schema);
78                 }
79
80                 @Override
81                 public void locked(Object node, List<String> ids) {
82
83                 }
84
85                 @Override
86                 public void stolen(Object node, List<String> ids) {
87
88                 }
89             };
90             this.rpcCallback = temp;
91             rpc.registerCallback(temp);
92         }
93     }
94
95
96     protected void _transformingCallback(JsonNode tableUpdatesJson, MonitorCallBack monitorCallBack, DatabaseSchema dbSchema) {
97         //todo(ashwin): we should move all the JSON parsing logic to a utility class
98         if (tableUpdatesJson instanceof ObjectNode) {
99             Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
100             ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
101             for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();){
102                 Map.Entry<String, JsonNode> entry = itr.next();
103
104                 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
105                 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
106                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
107
108             }
109             TableUpdates updates = new TableUpdates(tableUpdateMap);
110             monitorCallBack.update(updates);
111         }
112     }
113
114     @Override
115     public ListenableFuture<List<OperationResult>> transact(List<Operation> operations) {
116
117         //todo, we may not need transactionbuilder if we can have JSON objects
118         TransactBuilder builder = new TransactBuilder();
119         for (Operation o : operations) {
120             builder.addOperation(o);
121         }
122
123         return rpc.transact(builder);
124     }
125
126     @Override
127     public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
128                                                             List<MonitorRequest<E>> monitorRequest,
129                                                             final MonitorCallBack callback) {
130
131         final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
132                 new Function<MonitorRequest<E>, String>() {
133                     @Override
134                     public String apply(MonitorRequest<E> input) {
135                         return input.getTableName();
136                     }
137                 });
138
139         final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
140         registerCallback(monitorHandle, callback, dbSchema);
141
142         ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
143             @Override
144             public List<Object> params() {
145                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
146             }
147         });
148         Futures.addCallback(monitor, new FutureCallback<JsonNode>() {
149             @Override
150             public void onSuccess(JsonNode result) {
151                 _transformingCallback(result, callback, dbSchema);
152             }
153
154             @Override
155             public void onFailure(Throwable t) {
156                 callback.exception(t);
157             }
158         });
159
160         return monitorHandle;
161     }
162
163     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
164         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
165         setupUpdateListener();
166     }
167
168     @Override
169     public void cancelMonitor(MonitorHandle handler) {
170         throw new UnsupportedOperationException("not yet implemented");
171     }
172
173     @Override
174     public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
175         throw new UnsupportedOperationException("not yet implemented");
176     }
177
178     @Override
179     public ListenableFuture<Boolean> steal(String lockId) {
180         throw new UnsupportedOperationException("not yet implemented");
181     }
182
183     @Override
184     public ListenableFuture<Boolean> unLock(String lockId) {
185         throw new UnsupportedOperationException("not yet implemented");
186     }
187
188     @Override
189     public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
190         throw new UnsupportedOperationException("not yet implemented");
191     }
192
193     @Override
194     public void stopEchoService() {
195         throw new UnsupportedOperationException("not yet implemented");
196     }
197
198     @Override
199     public TransactionBuilder transactBuilder() {
200         return new TransactionBuilder(this);
201     }
202
203
204     public boolean isReady(int timeout) throws InterruptedException {
205         while (timeout > 0) {
206             if (!schema.isEmpty()) {
207                 return true;
208             }
209             Thread.sleep(1000);
210             timeout--;
211         }
212         return false;
213     }
214
215     @Override
216     public ListenableFuture<List<String>> getDatabases() {
217         return rpc.list_dbs();
218     }
219
220     @Override
221     public ListenableFuture<DatabaseSchema> getSchema(final String database, final boolean cacheResult) {
222
223         DatabaseSchema databaseSchema = schema.get(database);
224
225         if (databaseSchema == null || cacheResult) {
226             return Futures.transform(
227                     getSchemaFromDevice(Lists.newArrayList(database)),
228                     new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
229                         @Override
230                         public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
231                             if (result.containsKey(database)) {
232                                 DatabaseSchema s = result.get(database);
233                                 if (cacheResult) {
234                                     OvsDBClientImpl.this.schema.put(database, s);
235                                 }
236                                 return s;
237                             } else {
238                                 return null;
239                             }
240                         }
241                     }, executorService);
242
243
244         } else {
245             return Futures.immediateFuture(databaseSchema);
246         }
247     }
248
249     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
250         Map<String, DatabaseSchema> schema = Maps.newHashMap();
251         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
252         _populateSchema(dbNames, schema, future);
253         return future;
254     }
255
256     private void _populateSchema(final List<String> dbNames,
257                                  final Map<String, DatabaseSchema> schema,
258                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
259
260         if (dbNames == null || dbNames.isEmpty()) {
261             return;
262         }
263
264         Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
265                 new com.google.common.base.Function<JsonNode, Void>() {
266                     @Override
267                     public Void apply(JsonNode jsonNode) {
268                         try {
269                             schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
270                             if (schema.size() > 1 && !sfuture.isCancelled()) {
271                                 _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
272                             } else if (schema.size() == 1) {
273                                 sfuture.set(schema);
274                             }
275                         } catch (Throwable e) {
276                             sfuture.setException(e);
277                         }
278                         return null;
279                     }
280                 });
281     }
282
283     public void setRpc(OvsdbRPC rpc) {
284         this.rpc = rpc;
285     }
286
287     public Queue<Throwable> getExceptions() {
288         return exceptions;
289     }
290
291     static class CallbackContext {
292         MonitorCallBack monitorCallBack;
293         DatabaseSchema schema;
294
295         CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
296             this.monitorCallBack = monitorCallBack;
297             this.schema = schema;
298         }
299     }
300 }