Eliminate OvsdbClientImpl duplication
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbClientImpl.java
1 /*
2  * Copyright © 2014, 2017 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.lib.impl;
9
10 import com.fasterxml.jackson.databind.JsonNode;
11 import com.fasterxml.jackson.databind.node.ObjectNode;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import io.netty.channel.Channel;
20 import java.util.Arrays;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.UUID;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
34 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
35 import org.opendaylight.ovsdb.lib.LockStolenCallback;
36 import org.opendaylight.ovsdb.lib.MonitorCallBack;
37 import org.opendaylight.ovsdb.lib.MonitorHandle;
38 import org.opendaylight.ovsdb.lib.OvsdbClient;
39 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
40 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
41 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
42 import org.opendaylight.ovsdb.lib.error.ParsingException;
43 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
44 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
45 import org.opendaylight.ovsdb.lib.message.TableUpdate;
46 import org.opendaylight.ovsdb.lib.message.TableUpdates;
47 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
48 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
49 import org.opendaylight.ovsdb.lib.notation.Row;
50 import org.opendaylight.ovsdb.lib.operations.Operation;
51 import org.opendaylight.ovsdb.lib.operations.OperationResult;
52 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
53 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
54 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
55 import org.opendaylight.ovsdb.lib.schema.TableSchema;
56 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
57 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
58 import org.opendaylight.ovsdb.lib.schema.typed.TypedReflections;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 public class OvsdbClientImpl implements OvsdbClient {
63
64     private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
65     private ExecutorService executorService;
66     private OvsdbRPC rpc;
67     private final Map<String, TypedDatabaseSchema> schemas = new HashMap<>();
68     private final Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
69     private OvsdbRPC.Callback rpcCallback;
70     private OvsdbConnectionInfo connectionInfo;
71     private Channel channel;
72     private boolean isConnectionPublished;
73     private static final int NO_TIMEOUT = -1;
74
75     private static final ThreadFactory THREAD_FACTORY_SSL =
76         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
77     private static final ThreadFactory THREAD_FACTORY_NON_SSL =
78         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
79
80     public OvsdbClientImpl(final OvsdbRPC rpc, final Channel channel, final ConnectionType type,
81         final SocketConnectionType socketConnType) {
82         this.rpc = rpc;
83         ThreadFactory threadFactory =
84             getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
85         this.executorService = Executors.newCachedThreadPool(threadFactory);
86         this.channel = channel;
87         this.connectionInfo = new OvsdbConnectionInfo(channel, type);
88     }
89
90     /**
91      * Generate the threadFactory based on ACTIVE, PASSIVE (SSL/NON-SSL) connection type.
92      * @param type ACTIVE or PASSIVE {@link ConnectionType}
93      * @param socketConnType SSL or NON-SSL {@link SocketConnectionType}
94      * @param executorNameArgs Additional args to append to thread name format
95      * @return {@link ThreadFactory}
96      */
97     private ThreadFactory getThreadFactory(final ConnectionType type,
98         final SocketConnectionType socketConnType, final String executorNameArgs) {
99         if (type == ConnectionType.PASSIVE) {
100             switch (socketConnType) {
101                 case SSL:
102                     return THREAD_FACTORY_SSL;
103                 case NON_SSL:
104                     return THREAD_FACTORY_NON_SSL;
105                 default:
106                     return Executors.defaultThreadFactory();
107             }
108         } else if (type == ConnectionType.ACTIVE) {
109             ThreadFactory threadFactorySSL =
110                 new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConn-" + executorNameArgs + "-%d")
111                     .build();
112             return threadFactorySSL;
113         }
114         // Default case
115         return Executors.defaultThreadFactory();
116     }
117
118     OvsdbClientImpl() {
119     }
120
121     void setupUpdateListener() {
122         if (rpcCallback == null) {
123             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
124                 @Override
125                 public void update(final Object node, final UpdateNotification updateNotification) {
126                     String key = updateNotification.getContext();
127                     CallbackContext callbackContext = monitorCallbacks.get(key);
128                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
129                     if (monitorCallBack == null) {
130                         //ignore ?
131                         LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
132                         return;
133                     }
134                     TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
135                             callbackContext.schema);
136                     monitorCallBack.update(updates, callbackContext.schema);
137                 }
138
139                 @Override
140                 public void locked(final Object node, final List<String> ids) {
141
142                 }
143
144                 @Override
145                 public void stolen(final Object node, final List<String> ids) {
146
147                 }
148             };
149             this.rpcCallback = temp;
150             rpc.registerCallback(temp);
151         }
152     }
153
154
155     protected TableUpdates transformingCallback(final JsonNode tableUpdatesJson, final DatabaseSchema dbSchema) {
156         //todo(ashwin): we should move all the JSON parsing logic to a utility class
157         if (tableUpdatesJson instanceof ObjectNode) {
158             Map<String, TableUpdate> tableUpdateMap = new HashMap<>();
159             ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
160             for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
161                 Map.Entry<String, JsonNode> entry = itr.next();
162
163                 DatabaseSchema databaseSchema = this.schemas.get(dbSchema.getName());
164                 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
165                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
166
167             }
168             return new TableUpdates(tableUpdateMap);
169         }
170         return null;
171     }
172
173     @Override
174     public ListenableFuture<List<OperationResult>> transact(final DatabaseSchema dbSchema,
175             final List<Operation> operations) {
176
177         //todo, we may not need transactionbuilder if we can have JSON objects
178         TransactBuilder builder = new TransactBuilder(dbSchema);
179         for (Operation operation : operations) {
180             builder.addOperation(operation);
181         }
182
183         return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
184     }
185
186     @Override
187     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
188                                                            final List<MonitorRequest> monitorRequest,
189                                                            final MonitorCallBack callback) {
190         return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
191     }
192
193     @Override
194     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
195                                                            final List<MonitorRequest> monitorRequest,
196                                                            final MonitorCallBack callback,
197                                                            final int timeout) {
198         return monitor(dbSchema, monitorRequest, new MonitorHandle(UUID.randomUUID().toString()), callback, timeout);
199     }
200
201     @Override
202     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
203                                                            final List<MonitorRequest> monitorRequest,
204                                                            final MonitorHandle monitorHandle,
205                                                            final MonitorCallBack callback) {
206         return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
207     }
208
209     @Override
210     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
211                                                            final List<MonitorRequest> monitorRequest,
212                                                            final MonitorHandle monitorHandle,
213                                                            final MonitorCallBack callback,
214                                                            final int timeout) {
215         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
216                 MonitorRequest::getTableName);
217
218         registerCallback(monitorHandle, callback, dbSchema);
219
220         final ListenableFuture<JsonNode> monitor = rpc.monitor(
221             () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
222         final JsonNode result;
223         try {
224             if (timeout == NO_TIMEOUT) {
225                 result = monitor.get();
226             } else {
227                 result = monitor.get(timeout, TimeUnit.SECONDS);
228             }
229         } catch (InterruptedException | ExecutionException | TimeoutException e) {
230             LOG.warn("Failed to monitor {}", dbSchema, e);
231             return null;
232         }
233         return transformingCallback(result, dbSchema);
234     }
235
236     private void registerCallback(final MonitorHandle monitorHandle, final MonitorCallBack callback,
237             final DatabaseSchema schema) {
238         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
239         setupUpdateListener();
240     }
241
242     @Override
243     public void cancelMonitor(final MonitorHandle handler) {
244         cancelMonitor(handler, NO_TIMEOUT);
245     }
246
247     @Override
248     public void cancelMonitor(final MonitorHandle handler, final int timeout) {
249         ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
250
251         JsonNode result = null;
252         try {
253             if (timeout == NO_TIMEOUT) {
254                 result = cancelMonitor.get();
255             } else {
256                 result = cancelMonitor.get(timeout, TimeUnit.SECONDS);
257             }
258         } catch (InterruptedException | ExecutionException | TimeoutException e) {
259             LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
260         }
261
262         if (result == null) {
263             LOG.error("Fail to cancel monitor with handler {}", handler.getId());
264         } else {
265             LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
266         }
267     }
268
269     @Override
270     public ListenableFuture<List<String>> echo() {
271         return rpc.echo();
272     }
273
274     @Override
275     public void lock(final String lockId, final LockAquisitionCallback lockedCallBack,
276             final LockStolenCallback stolenCallback) {
277         throw new UnsupportedOperationException("not yet implemented");
278     }
279
280     @Override
281     public ListenableFuture<Boolean> steal(final String lockId) {
282         throw new UnsupportedOperationException("not yet implemented");
283     }
284
285     @Override
286     public ListenableFuture<Boolean> unLock(final String lockId) {
287         throw new UnsupportedOperationException("not yet implemented");
288     }
289
290     @Override
291     public void startEchoService(final EchoServiceCallbackFilters callbackFilters) {
292         throw new UnsupportedOperationException("not yet implemented");
293     }
294
295     @Override
296     public void stopEchoService() {
297         throw new UnsupportedOperationException("not yet implemented");
298     }
299
300     @Override
301     public TransactionBuilder transactBuilder(final DatabaseSchema dbSchema) {
302         return new TransactionBuilder(this, dbSchema);
303     }
304
305
306     public boolean isReady(int timeout) throws InterruptedException {
307         while (timeout > 0) {
308             if (!schemas.isEmpty()) {
309                 return true;
310             }
311             Thread.sleep(1000);
312             timeout--;
313         }
314         return false;
315     }
316
317     @Override
318     public ListenableFuture<List<String>> getDatabases() {
319         return rpc.list_dbs();
320     }
321
322     @Override
323     public ListenableFuture<DatabaseSchema> getSchema(final String database) {
324         final TypedDatabaseSchema existing = schemas.get(database);
325         if (existing != null) {
326             return Futures.immediateFuture(existing);
327         }
328
329         return Futures.transform(getSchemaFromDevice(Collections.singletonList(database)), result -> {
330             final DatabaseSchema dbSchema = result.get(database);
331             if (dbSchema == null) {
332                 return null;
333             }
334
335             final TypedDatabaseSchema typedSchema = TypedDatabaseSchema.of(dbSchema.withInternallyGeneratedColumns());
336             final TypedDatabaseSchema raced = schemas.putIfAbsent(database, typedSchema);
337             return raced != null ? raced : typedSchema;
338         }, executorService);
339     }
340
341     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
342         Map<String, DatabaseSchema> schema = new HashMap<>();
343         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
344         populateSchema(dbNames, schema, future);
345         return future;
346     }
347
348     private void populateSchema(final List<String> dbNames,
349                                  final Map<String, DatabaseSchema> schema,
350                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
351
352         if (dbNames == null || dbNames.isEmpty()) {
353             return;
354         }
355
356         Futures.transform(rpc.get_schema(Collections.singletonList(dbNames.get(0))), jsonNode -> {
357             try {
358                 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
359                 if (schema.size() > 1 && !sfuture.isCancelled()) {
360                     populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
361                 } else if (schema.size() == 1) {
362                     sfuture.set(schema);
363                 }
364             } catch (ParsingException e) {
365                 LOG.warn("Failed to populate schema {}:{}", dbNames, schema, e);
366                 sfuture.setException(e);
367             }
368             return null;
369         }, MoreExecutors.directExecutor());
370     }
371
372     public void setRpc(final OvsdbRPC rpc) {
373         this.rpc = rpc;
374     }
375
376     static class CallbackContext {
377         MonitorCallBack monitorCallBack;
378         DatabaseSchema schema;
379
380         CallbackContext(final MonitorCallBack monitorCallBack, final DatabaseSchema schema) {
381             this.monitorCallBack = monitorCallBack;
382             this.schema = schema;
383         }
384     }
385
386     @Override
387     public TypedDatabaseSchema getDatabaseSchema(final String dbName) {
388         return schemas.get(dbName);
389     }
390
391     /**
392      * This method finds the DatabaseSchema that matches a given Typed Table Class.
393      * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
394      * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
395      *
396      * @param klazz Typed Class that represents a Table
397      * @return DatabaseSchema that matches a Typed Table Class
398      */
399     private <T> TypedDatabaseSchema getDatabaseSchemaForTypedTable(final Class<T> klazz) {
400         final String dbName = TypedReflections.getTableDatabase(klazz);
401         return dbName == null ? null : getDatabaseSchema(dbName);
402     }
403
404     /**
405      * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
406      * given the Typed Table Class.
407      *
408      * @param klazz Typed Interface
409      * @return Proxy wrapper for the actual raw Row class.
410      */
411     @Override
412     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final Class<T> klazz) {
413         return getTypedRowWrapper(klazz, new Row<>());
414     }
415
416     /**
417      * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
418      * DatabaseSchema and Typed Table Class.
419      *
420      * @param dbSchema Database Schema of interest
421      * @param klazz Typed Interface
422      * @return Proxy wrapper for the actual raw Row class.
423      */
424     @Override
425     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final DatabaseSchema dbSchema, final Class<T> klazz) {
426         return dbSchema == null ? null : TypedDatabaseSchema.of(dbSchema).getTypedRowWrapper(klazz, new Row<>());
427     }
428
429     /**
430      * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
431      *
432      * @param klazz Typed Interface
433      * @param row The actual Row that the wrapper is operating on.
434      *            It can be null if the caller is just interested in getting ColumnSchema.
435      * @return Proxy wrapper for the actual raw Row class.
436      */
437     @Override
438
439     public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
440         final TypedDatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
441         return dbSchema == null ? null : dbSchema.getTypedRowWrapper(klazz, row);
442     }
443
444     @Override
445     public OvsdbConnectionInfo getConnectionInfo() {
446         return connectionInfo;
447     }
448
449     @Override
450     public boolean isActive() {
451         return channel.isActive();
452     }
453
454     @Override
455     public void disconnect() {
456         channel.disconnect();
457         executorService.shutdown();
458     }
459
460     @Override
461     public boolean isConnectionPublished() {
462         return isConnectionPublished;
463     }
464
465     @Override
466     public void setConnectionPublished(final boolean connectionPublished) {
467         isConnectionPublished = connectionPublished;
468     }
469 }