Remove use of deprecated Guava methods
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbClientImpl.java
1 /*
2  * Copyright © 2014, 2017 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.lib.impl;
10
11 import com.fasterxml.jackson.databind.JsonNode;
12 import com.fasterxml.jackson.databind.node.ObjectNode;
13 import com.google.common.base.Function;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Maps;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import io.netty.channel.Channel;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.UUID;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
36 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
37 import org.opendaylight.ovsdb.lib.LockStolenCallback;
38 import org.opendaylight.ovsdb.lib.MonitorCallBack;
39 import org.opendaylight.ovsdb.lib.MonitorHandle;
40 import org.opendaylight.ovsdb.lib.OvsdbClient;
41 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
42 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
43 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
44 import org.opendaylight.ovsdb.lib.error.ParsingException;
45 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
46 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
47 import org.opendaylight.ovsdb.lib.message.TableUpdate;
48 import org.opendaylight.ovsdb.lib.message.TableUpdates;
49 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
50 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
51 import org.opendaylight.ovsdb.lib.notation.Row;
52 import org.opendaylight.ovsdb.lib.operations.Operation;
53 import org.opendaylight.ovsdb.lib.operations.OperationResult;
54 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
55 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
56 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
57 import org.opendaylight.ovsdb.lib.schema.TableSchema;
58 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
59 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
60 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64
65 public class OvsdbClientImpl implements OvsdbClient {
66
67     private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
68     private ExecutorService executorService;
69     private OvsdbRPC rpc;
70     private Map<String, DatabaseSchema> schemas = new HashMap<>();
71     private Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
72     private OvsdbRPC.Callback rpcCallback;
73     private OvsdbConnectionInfo connectionInfo;
74     private Channel channel;
75     private boolean isConnectionPublished;
76     private static final int NO_TIMEOUT = -1;
77
78     private static final ThreadFactory THREAD_FACTORY_SSL =
79         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
80     private static final ThreadFactory THREAD_FACTORY_NON_SSL =
81         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
82
83     public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type,
84         SocketConnectionType socketConnType) {
85         this.rpc = rpc;
86         ThreadFactory threadFactory =
87             getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
88         this.executorService = Executors.newCachedThreadPool(threadFactory);
89         this.channel = channel;
90         this.connectionInfo = new OvsdbConnectionInfo(channel, type);
91     }
92
93     /**
94      * Generate the threadFactory based on ACTIVE, PASSIVE (SSL/NON-SSL) connection type.
95      * @param type ACTIVE or PASSIVE {@link ConnectionType}
96      * @param socketConnType SSL or NON-SSL {@link SocketConnectionType}
97      * @param executorNameArgs Additional args to append to thread name format
98      * @return {@link ThreadFactory}
99      */
100     private ThreadFactory getThreadFactory(ConnectionType type,
101         SocketConnectionType socketConnType, String executorNameArgs) {
102         if (type == ConnectionType.PASSIVE) {
103             switch (socketConnType) {
104                 case SSL:
105                     return THREAD_FACTORY_SSL;
106                 case NON_SSL:
107                     return THREAD_FACTORY_NON_SSL;
108                 default:
109                     return Executors.defaultThreadFactory();
110             }
111         } else if (type == ConnectionType.ACTIVE) {
112             ThreadFactory threadFactorySSL =
113                 new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConn-" + executorNameArgs + "-%d")
114                     .build();
115             return threadFactorySSL;
116         }
117         // Default case
118         return Executors.defaultThreadFactory();
119     }
120
121     OvsdbClientImpl() {
122     }
123
124     void setupUpdateListener() {
125         if (rpcCallback == null) {
126             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
127                 @Override
128                 public void update(Object node, UpdateNotification updateNotification) {
129                     String key = updateNotification.getContext();
130                     CallbackContext callbackContext = monitorCallbacks.get(key);
131                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
132                     if (monitorCallBack == null) {
133                         //ignore ?
134                         LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
135                         return;
136                     }
137                     TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
138                             callbackContext.schema);
139                     monitorCallBack.update(updates, callbackContext.schema);
140                 }
141
142                 @Override
143                 public void locked(Object node, List<String> ids) {
144
145                 }
146
147                 @Override
148                 public void stolen(Object node, List<String> ids) {
149
150                 }
151             };
152             this.rpcCallback = temp;
153             rpc.registerCallback(temp);
154         }
155     }
156
157
158     protected TableUpdates transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
159         //todo(ashwin): we should move all the JSON parsing logic to a utility class
160         if (tableUpdatesJson instanceof ObjectNode) {
161             Map<String, TableUpdate> tableUpdateMap = new HashMap<>();
162             ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
163             for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
164                 Map.Entry<String, JsonNode> entry = itr.next();
165
166                 DatabaseSchema databaseSchema = this.schemas.get(dbSchema.getName());
167                 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
168                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
169
170             }
171             return new TableUpdates(tableUpdateMap);
172         }
173         return null;
174     }
175
176     @Override
177     public ListenableFuture<List<OperationResult>> transact(DatabaseSchema dbSchema, List<Operation> operations) {
178
179         //todo, we may not need transactionbuilder if we can have JSON objects
180         TransactBuilder builder = new TransactBuilder(dbSchema);
181         for (Operation operation : operations) {
182             builder.addOperation(operation);
183         }
184
185         return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
186     }
187
188     @Override
189     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
190                                                            List<MonitorRequest> monitorRequest,
191                                                            final MonitorCallBack callback) {
192         return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
193     }
194
195     @Override
196     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
197                                                             List<MonitorRequest> monitorRequest,
198                                                             final MonitorCallBack callback,
199                                                             int timeout) {
200
201         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
202                 MonitorRequest::getTableName);
203
204         final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
205         registerCallback(monitorHandle, callback, dbSchema);
206
207         ListenableFuture<JsonNode> monitor = rpc.monitor(
208             () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
209         JsonNode result;
210         try {
211             if (timeout == NO_TIMEOUT) {
212                 result = monitor.get();
213             } else {
214                 result = monitor.get(timeout, TimeUnit.SECONDS);
215             }
216         } catch (InterruptedException | ExecutionException | TimeoutException e) {
217             LOG.warn("Failed to monitor {}", dbSchema, e);
218             return null;
219         }
220         return transformingCallback(result, dbSchema);
221     }
222
223     @Override
224     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
225                                                            List<MonitorRequest> monitorRequest,
226                                                            final MonitorHandle monitorHandle,
227                                                            final MonitorCallBack callback) {
228         return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
229     }
230
231     @Override
232     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
233                                                            List<MonitorRequest> monitorRequest,
234                                                            final MonitorHandle monitorHandle,
235                                                            final MonitorCallBack callback,
236                                                            int timeout) {
237
238         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
239                 MonitorRequest::getTableName);
240
241         registerCallback(monitorHandle, callback, dbSchema);
242
243         ListenableFuture<JsonNode> monitor = rpc.monitor(
244             () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
245         JsonNode result;
246         try {
247             if (timeout == NO_TIMEOUT) {
248                 result = monitor.get();
249             } else {
250                 result = monitor.get(timeout, TimeUnit.SECONDS);
251             }
252         } catch (InterruptedException | ExecutionException | TimeoutException e) {
253             LOG.warn("Failed to monitor {}", dbSchema, e);
254             return null;
255         }
256         return transformingCallback(result, dbSchema);
257     }
258
259     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
260         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
261         setupUpdateListener();
262     }
263
264     @Override
265     public void cancelMonitor(final MonitorHandle handler) {
266         cancelMonitor(handler, NO_TIMEOUT);
267     }
268
269     @Override
270     public void cancelMonitor(final MonitorHandle handler, int timeout) {
271         ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
272
273         JsonNode result = null;
274         try {
275             if (timeout == NO_TIMEOUT) {
276                 result = cancelMonitor.get();
277             } else {
278                 result = cancelMonitor.get(timeout, TimeUnit.SECONDS);
279             }
280         } catch (InterruptedException | ExecutionException | TimeoutException e) {
281             LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
282         }
283
284         if (result == null) {
285             LOG.error("Fail to cancel monitor with handler {}", handler.getId());
286         } else {
287             LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
288         }
289     }
290
291     @Override
292     public ListenableFuture<List<String>> echo() {
293         return rpc.echo();
294     }
295
296     @Override
297     public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
298         throw new UnsupportedOperationException("not yet implemented");
299     }
300
301     @Override
302     public ListenableFuture<Boolean> steal(String lockId) {
303         throw new UnsupportedOperationException("not yet implemented");
304     }
305
306     @Override
307     public ListenableFuture<Boolean> unLock(String lockId) {
308         throw new UnsupportedOperationException("not yet implemented");
309     }
310
311     @Override
312     public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
313         throw new UnsupportedOperationException("not yet implemented");
314     }
315
316     @Override
317     public void stopEchoService() {
318         throw new UnsupportedOperationException("not yet implemented");
319     }
320
321     @Override
322     public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) {
323         return new TransactionBuilder(this, dbSchema);
324     }
325
326
327     public boolean isReady(int timeout) throws InterruptedException {
328         while (timeout > 0) {
329             if (!schemas.isEmpty()) {
330                 return true;
331             }
332             Thread.sleep(1000);
333             timeout--;
334         }
335         return false;
336     }
337
338     @Override
339     public ListenableFuture<List<String>> getDatabases() {
340         return rpc.list_dbs();
341     }
342
343     @Override
344     public ListenableFuture<DatabaseSchema> getSchema(final String database) {
345
346         DatabaseSchema databaseSchema = schemas.get(database);
347
348         if (databaseSchema == null) {
349             return Futures.transform(
350                 getSchemaFromDevice(Collections.singletonList(database)),
351                 (Function<Map<String, DatabaseSchema>, DatabaseSchema>) result -> {
352                     if (result.containsKey(database)) {
353                         DatabaseSchema dbSchema = result.get(database);
354                         dbSchema.populateInternallyGeneratedColumns();
355                         OvsdbClientImpl.this.schemas.put(database, dbSchema);
356                         return dbSchema;
357                     } else {
358                         return null;
359                     }
360                 }, executorService);
361         } else {
362             return Futures.immediateFuture(databaseSchema);
363         }
364     }
365
366     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
367         Map<String, DatabaseSchema> schema = new HashMap<>();
368         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
369         populateSchema(dbNames, schema, future);
370         return future;
371     }
372
373     private void populateSchema(final List<String> dbNames,
374                                  final Map<String, DatabaseSchema> schema,
375                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
376
377         if (dbNames == null || dbNames.isEmpty()) {
378             return;
379         }
380
381         Futures.transform(rpc.get_schema(Collections.singletonList(dbNames.get(0))), jsonNode -> {
382             try {
383                 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
384                 if (schema.size() > 1 && !sfuture.isCancelled()) {
385                     populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
386                 } else if (schema.size() == 1) {
387                     sfuture.set(schema);
388                 }
389             } catch (ParsingException e) {
390                 LOG.warn("Failed to populate schema {}:{}", dbNames, schema, e);
391                 sfuture.setException(e);
392             }
393             return null;
394         }, MoreExecutors.directExecutor());
395     }
396
397     public void setRpc(OvsdbRPC rpc) {
398         this.rpc = rpc;
399     }
400
401     static class CallbackContext {
402         MonitorCallBack monitorCallBack;
403         DatabaseSchema schema;
404
405         CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
406             this.monitorCallBack = monitorCallBack;
407             this.schema = schema;
408         }
409     }
410
411     @Override
412     public DatabaseSchema getDatabaseSchema(String dbName) {
413         return schemas.get(dbName);
414     }
415
416     /**
417      * This method finds the DatabaseSchema that matches a given Typed Table Class.
418      * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
419      * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
420      *
421      * @param klazz Typed Class that represents a Table
422      * @return DatabaseSchema that matches a Typed Table Class
423      */
424     private <T> DatabaseSchema getDatabaseSchemaForTypedTable(Class<T> klazz) {
425         TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
426         if (typedTable != null) {
427             return this.getDatabaseSchema(typedTable.database());
428         }
429         return null;
430     }
431
432     /**
433      * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
434      * given the Typed Table Class.
435      *
436      * @param klazz Typed Interface
437      * @return Proxy wrapper for the actual raw Row class.
438      */
439     @Override
440     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(Class<T> klazz) {
441         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
442         return this.createTypedRowWrapper(dbSchema, klazz);
443     }
444
445     /**
446      * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
447      * DatabaseSchema and Typed Table Class.
448      *
449      * @param dbSchema Database Schema of interest
450      * @param klazz Typed Interface
451      * @return Proxy wrapper for the actual raw Row class.
452      */
453     @Override
454     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
455         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<>());
456     }
457
458     /**
459      * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
460      *
461      * @param klazz Typed Interface
462      * @param row The actual Row that the wrapper is operating on.
463      *            It can be null if the caller is just interested in getting ColumnSchema.
464      * @return Proxy wrapper for the actual raw Row class.
465      */
466     @Override
467
468     public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
469         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
470         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
471     }
472
473     @Override
474     public OvsdbConnectionInfo getConnectionInfo() {
475         return connectionInfo;
476     }
477
478     @Override
479     public boolean isActive() {
480         return channel.isActive();
481     }
482
483     @Override
484     public void disconnect() {
485         channel.disconnect();
486         executorService.shutdown();
487     }
488
489     @Override
490     public boolean isConnectionPublished() {
491         return isConnectionPublished;
492     }
493
494     @Override
495     public void setConnectionPublished(boolean connectionPublished) {
496         isConnectionPublished = connectionPublished;
497     }
498 }