Stop isReady method always returning true
[ovsdb.git] / library / src / main / java / org / opendaylight / ovsdb / lib / OvsDBClientImpl.java
1 /*
2  * Copyright (C) 2014 EBay Software Foundation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Ashwin Raveendran
9  */
10 package org.opendaylight.ovsdb.lib;
11
12 import com.fasterxml.jackson.databind.JsonNode;
13 import com.google.common.base.Function;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Lists;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
22 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
23 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
24 import org.opendaylight.ovsdb.lib.message.TableUpdates;
25 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
26 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
27 import org.opendaylight.ovsdb.lib.operations.Operation;
28 import org.opendaylight.ovsdb.lib.operations.OperationResult;
29 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
30 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
31 import org.opendaylight.ovsdb.lib.schema.TableSchema;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Queue;
39 import java.util.UUID;
40 import java.util.concurrent.ExecutorService;
41
42
43 public class OvsDBClientImpl implements OvsDBClient {
44
45     protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
46     private ExecutorService executorService;
47     private OvsdbRPC rpc;
48     private Map<String, DatabaseSchema> schema = Maps.newHashMap();
49     private HashMap<String, MonitorCallBack> monitorCallbacks = Maps.newHashMap();
50     private Queue<Throwable> exceptions;
51     private OvsdbRPC.Callback rpcCallback;
52
53     public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
54         this.rpc = rpc;
55         this.executorService = executorService;
56     }
57
58     OvsDBClientImpl() {
59     }
60
61     void setupUpdateListner() {
62         if (rpcCallback == null) {
63             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
64                 @Override
65                 public void update(Object node, UpdateNotification upadateNotification) {
66                     Object key = upadateNotification.getContext();
67                     MonitorCallBack monitorCallBack = monitorCallbacks.get(key);
68                     if (monitorCallBack == null) {
69                         //ignore ?
70                         logger.info("callback received with context {}, but no known handler. Ignoring!", key);
71                         return;
72                     }
73                     monitorCallBack.update(upadateNotification.getUpdate());
74                 }
75
76                 @Override
77                 public void locked(Object node, List<String> ids) {
78
79                 }
80
81                 @Override
82                 public void stolen(Object node, List<String> ids) {
83
84                 }
85             };
86             this.rpcCallback = temp;
87             rpc.registerCallback(temp);
88         }
89     }
90
91     @Override
92     public ListenableFuture<List<OperationResult>> transact(List<Operation> operations) {
93
94         //todo, we may not need transactionbuilder if we can have JSON objects
95         TransactBuilder builder = new TransactBuilder();
96         for (Operation o : operations) {
97             builder.addOperation(o);
98         }
99
100         return rpc.transact(builder);
101     }
102
103     @Override
104     public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
105                                                             List<MonitorRequest<E>> monitorRequest,
106                                                             final MonitorCallBack callback) {
107
108         final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
109                 new Function<MonitorRequest<E>, String>() {
110                     @Override
111                     public String apply(MonitorRequest<E> input) {
112                         return input.getTableName();
113                     }
114                 });
115
116         final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
117         registerCallback(monitorHandle, callback);
118
119         ListenableFuture<TableUpdates> monitor = rpc.monitor(new Params() {
120             @Override
121             public List<Object> params() {
122                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
123             }
124         });
125         Futures.addCallback(monitor, new FutureCallback<TableUpdates>() {
126             @Override
127             public void onSuccess(TableUpdates result) {
128                 callback.update(result);
129             }
130
131             @Override
132             public void onFailure(Throwable t) {
133                 callback.exception(t);
134             }
135         });
136
137         return monitorHandle;
138     }
139
140     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback) {
141         this.monitorCallbacks.put(monitorHandle.getId(), callback);
142         setupUpdateListner();
143     }
144
145     @Override
146     public void cancelMonitor(MonitorHandle handler) {
147         throw new UnsupportedOperationException("not yet implemented");
148     }
149
150     @Override
151     public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
152         throw new UnsupportedOperationException("not yet implemented");
153     }
154
155     @Override
156     public ListenableFuture<Boolean> steal(String lockId) {
157         throw new UnsupportedOperationException("not yet implemented");
158     }
159
160     @Override
161     public ListenableFuture<Boolean> unLock(String lockId) {
162         throw new UnsupportedOperationException("not yet implemented");
163     }
164
165     @Override
166     public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
167         throw new UnsupportedOperationException("not yet implemented");
168     }
169
170     @Override
171     public void stopEchoService() {
172         throw new UnsupportedOperationException("not yet implemented");
173     }
174
175     @Override
176     public TransactionBuilder transactBuilder() {
177         return new TransactionBuilder(this);
178     }
179
180
181     public boolean isReady(int timeout) throws InterruptedException {
182         while (timeout > 0) {
183             if (!schema.isEmpty()) {
184                 return true;
185             }
186             Thread.sleep(1000);
187             timeout--;
188         }
189         return false;
190     }
191
192     @Override
193     public ListenableFuture<List<String>> getDatabases() {
194         return rpc.list_dbs();
195     }
196
197     @Override
198     public ListenableFuture<DatabaseSchema> getSchema(final String database, final boolean cacheResult) {
199
200         DatabaseSchema databaseSchema = schema.get(database);
201
202         if (databaseSchema == null || cacheResult) {
203             return Futures.transform(
204                     getSchemaFromDevice(Lists.newArrayList(database)),
205                     new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
206                         @Override
207                         public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
208                             if (result.containsKey(database)) {
209                                 DatabaseSchema s = result.get(database);
210                                 if (cacheResult) {
211                                     OvsDBClientImpl.this.schema.put(database, s);
212                                 }
213                                 return s;
214                             } else {
215                                 return null;
216                             }
217                         }
218                     }, executorService);
219
220
221         } else {
222             return Futures.immediateFuture(databaseSchema);
223         }
224     }
225
226     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
227         Map<String, DatabaseSchema> schema = Maps.newHashMap();
228         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
229         _populateSchema(dbNames, schema, future);
230         return future;
231     }
232
233     private void _populateSchema(final List<String> dbNames,
234                                  final Map<String, DatabaseSchema> schema,
235                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
236
237         if (dbNames == null || dbNames.isEmpty()) {
238             return;
239         }
240
241         Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
242                 new com.google.common.base.Function<JsonNode, Void>() {
243                     @Override
244                     public Void apply(JsonNode jsonNode) {
245                         try {
246                             schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
247                             if (schema.size() > 1 && !sfuture.isCancelled()) {
248                                 _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
249                             } else if (schema.size() == 1) {
250                                 sfuture.set(schema);
251                             }
252                         } catch (Throwable e) {
253                             sfuture.setException(e);
254                         }
255                         return null;
256                     }
257                 });
258     }
259
260     public void setRpc(OvsdbRPC rpc) {
261         this.rpc = rpc;
262     }
263
264     public Queue<Throwable> getExceptions() {
265         return exceptions;
266     }
267
268 }