372a6ed56a8418ebe33364c648375e373d5401f5
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbClientImpl.java
1 /*
2  * Copyright © 2014, 2017 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.lib.impl;
9
10 import com.fasterxml.jackson.databind.JsonNode;
11 import com.fasterxml.jackson.databind.node.ObjectNode;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import io.netty.channel.Channel;
20 import java.util.Arrays;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.UUID;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
34 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
35 import org.opendaylight.ovsdb.lib.LockStolenCallback;
36 import org.opendaylight.ovsdb.lib.MonitorCallBack;
37 import org.opendaylight.ovsdb.lib.MonitorHandle;
38 import org.opendaylight.ovsdb.lib.OvsdbClient;
39 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
40 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
41 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
42 import org.opendaylight.ovsdb.lib.error.ParsingException;
43 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
44 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
45 import org.opendaylight.ovsdb.lib.message.TableUpdate;
46 import org.opendaylight.ovsdb.lib.message.TableUpdates;
47 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
48 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
49 import org.opendaylight.ovsdb.lib.notation.Row;
50 import org.opendaylight.ovsdb.lib.operations.Operation;
51 import org.opendaylight.ovsdb.lib.operations.OperationResult;
52 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
53 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
54 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
55 import org.opendaylight.ovsdb.lib.schema.TableSchema;
56 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
57 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
58 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 public class OvsdbClientImpl implements OvsdbClient {
63
64     private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
65     private ExecutorService executorService;
66     private OvsdbRPC rpc;
67     private final Map<String, DatabaseSchema> schemas = new HashMap<>();
68     private final Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
69     private OvsdbRPC.Callback rpcCallback;
70     private OvsdbConnectionInfo connectionInfo;
71     private Channel channel;
72     private boolean isConnectionPublished;
73     private static final int NO_TIMEOUT = -1;
74
75     private static final ThreadFactory THREAD_FACTORY_SSL =
76         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
77     private static final ThreadFactory THREAD_FACTORY_NON_SSL =
78         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
79
80     public OvsdbClientImpl(final OvsdbRPC rpc, final Channel channel, final ConnectionType type,
81         final SocketConnectionType socketConnType) {
82         this.rpc = rpc;
83         ThreadFactory threadFactory =
84             getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
85         this.executorService = Executors.newCachedThreadPool(threadFactory);
86         this.channel = channel;
87         this.connectionInfo = new OvsdbConnectionInfo(channel, type);
88     }
89
90     /**
91      * Generate the threadFactory based on ACTIVE, PASSIVE (SSL/NON-SSL) connection type.
92      * @param type ACTIVE or PASSIVE {@link ConnectionType}
93      * @param socketConnType SSL or NON-SSL {@link SocketConnectionType}
94      * @param executorNameArgs Additional args to append to thread name format
95      * @return {@link ThreadFactory}
96      */
97     private ThreadFactory getThreadFactory(final ConnectionType type,
98         final SocketConnectionType socketConnType, final String executorNameArgs) {
99         if (type == ConnectionType.PASSIVE) {
100             switch (socketConnType) {
101                 case SSL:
102                     return THREAD_FACTORY_SSL;
103                 case NON_SSL:
104                     return THREAD_FACTORY_NON_SSL;
105                 default:
106                     return Executors.defaultThreadFactory();
107             }
108         } else if (type == ConnectionType.ACTIVE) {
109             ThreadFactory threadFactorySSL =
110                 new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConn-" + executorNameArgs + "-%d")
111                     .build();
112             return threadFactorySSL;
113         }
114         // Default case
115         return Executors.defaultThreadFactory();
116     }
117
118     OvsdbClientImpl() {
119     }
120
121     void setupUpdateListener() {
122         if (rpcCallback == null) {
123             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
124                 @Override
125                 public void update(final Object node, final UpdateNotification updateNotification) {
126                     String key = updateNotification.getContext();
127                     CallbackContext callbackContext = monitorCallbacks.get(key);
128                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
129                     if (monitorCallBack == null) {
130                         //ignore ?
131                         LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
132                         return;
133                     }
134                     TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
135                             callbackContext.schema);
136                     monitorCallBack.update(updates, callbackContext.schema);
137                 }
138
139                 @Override
140                 public void locked(final Object node, final List<String> ids) {
141
142                 }
143
144                 @Override
145                 public void stolen(final Object node, final List<String> ids) {
146
147                 }
148             };
149             this.rpcCallback = temp;
150             rpc.registerCallback(temp);
151         }
152     }
153
154
155     protected TableUpdates transformingCallback(final JsonNode tableUpdatesJson, final DatabaseSchema dbSchema) {
156         //todo(ashwin): we should move all the JSON parsing logic to a utility class
157         if (tableUpdatesJson instanceof ObjectNode) {
158             Map<String, TableUpdate> tableUpdateMap = new HashMap<>();
159             ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
160             for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
161                 Map.Entry<String, JsonNode> entry = itr.next();
162
163                 DatabaseSchema databaseSchema = this.schemas.get(dbSchema.getName());
164                 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
165                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
166
167             }
168             return new TableUpdates(tableUpdateMap);
169         }
170         return null;
171     }
172
173     @Override
174     public ListenableFuture<List<OperationResult>> transact(final DatabaseSchema dbSchema,
175             final List<Operation> operations) {
176
177         //todo, we may not need transactionbuilder if we can have JSON objects
178         TransactBuilder builder = new TransactBuilder(dbSchema);
179         for (Operation operation : operations) {
180             builder.addOperation(operation);
181         }
182
183         return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
184     }
185
186     @Override
187     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
188                                                            final List<MonitorRequest> monitorRequest,
189                                                            final MonitorCallBack callback) {
190         return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
191     }
192
193     @Override
194     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
195                                                             final List<MonitorRequest> monitorRequest,
196                                                             final MonitorCallBack callback,
197                                                             final int timeout) {
198
199         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
200                 MonitorRequest::getTableName);
201
202         final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
203         registerCallback(monitorHandle, callback, dbSchema);
204
205         ListenableFuture<JsonNode> monitor = rpc.monitor(
206             () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
207         JsonNode result;
208         try {
209             if (timeout == NO_TIMEOUT) {
210                 result = monitor.get();
211             } else {
212                 result = monitor.get(timeout, TimeUnit.SECONDS);
213             }
214         } catch (InterruptedException | ExecutionException | TimeoutException e) {
215             LOG.warn("Failed to monitor {}", dbSchema, e);
216             return null;
217         }
218         return transformingCallback(result, dbSchema);
219     }
220
221     @Override
222     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
223                                                            final List<MonitorRequest> monitorRequest,
224                                                            final MonitorHandle monitorHandle,
225                                                            final MonitorCallBack callback) {
226         return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
227     }
228
229     @Override
230     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
231                                                            final List<MonitorRequest> monitorRequest,
232                                                            final MonitorHandle monitorHandle,
233                                                            final MonitorCallBack callback,
234                                                            final int timeout) {
235
236         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
237                 MonitorRequest::getTableName);
238
239         registerCallback(monitorHandle, callback, dbSchema);
240
241         ListenableFuture<JsonNode> monitor = rpc.monitor(
242             () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
243         JsonNode result;
244         try {
245             if (timeout == NO_TIMEOUT) {
246                 result = monitor.get();
247             } else {
248                 result = monitor.get(timeout, TimeUnit.SECONDS);
249             }
250         } catch (InterruptedException | ExecutionException | TimeoutException e) {
251             LOG.warn("Failed to monitor {}", dbSchema, e);
252             return null;
253         }
254         return transformingCallback(result, dbSchema);
255     }
256
257     private void registerCallback(final MonitorHandle monitorHandle, final MonitorCallBack callback,
258             final DatabaseSchema schema) {
259         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
260         setupUpdateListener();
261     }
262
263     @Override
264     public void cancelMonitor(final MonitorHandle handler) {
265         cancelMonitor(handler, NO_TIMEOUT);
266     }
267
268     @Override
269     public void cancelMonitor(final MonitorHandle handler, final int timeout) {
270         ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
271
272         JsonNode result = null;
273         try {
274             if (timeout == NO_TIMEOUT) {
275                 result = cancelMonitor.get();
276             } else {
277                 result = cancelMonitor.get(timeout, TimeUnit.SECONDS);
278             }
279         } catch (InterruptedException | ExecutionException | TimeoutException e) {
280             LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
281         }
282
283         if (result == null) {
284             LOG.error("Fail to cancel monitor with handler {}", handler.getId());
285         } else {
286             LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
287         }
288     }
289
290     @Override
291     public ListenableFuture<List<String>> echo() {
292         return rpc.echo();
293     }
294
295     @Override
296     public void lock(final String lockId, final LockAquisitionCallback lockedCallBack,
297             final LockStolenCallback stolenCallback) {
298         throw new UnsupportedOperationException("not yet implemented");
299     }
300
301     @Override
302     public ListenableFuture<Boolean> steal(final String lockId) {
303         throw new UnsupportedOperationException("not yet implemented");
304     }
305
306     @Override
307     public ListenableFuture<Boolean> unLock(final String lockId) {
308         throw new UnsupportedOperationException("not yet implemented");
309     }
310
311     @Override
312     public void startEchoService(final EchoServiceCallbackFilters callbackFilters) {
313         throw new UnsupportedOperationException("not yet implemented");
314     }
315
316     @Override
317     public void stopEchoService() {
318         throw new UnsupportedOperationException("not yet implemented");
319     }
320
321     @Override
322     public TransactionBuilder transactBuilder(final DatabaseSchema dbSchema) {
323         return new TransactionBuilder(this, dbSchema);
324     }
325
326
327     public boolean isReady(int timeout) throws InterruptedException {
328         while (timeout > 0) {
329             if (!schemas.isEmpty()) {
330                 return true;
331             }
332             Thread.sleep(1000);
333             timeout--;
334         }
335         return false;
336     }
337
338     @Override
339     public ListenableFuture<List<String>> getDatabases() {
340         return rpc.list_dbs();
341     }
342
343     @Override
344     public ListenableFuture<DatabaseSchema> getSchema(final String database) {
345         final DatabaseSchema existing = schemas.get(database);
346         if (existing != null) {
347             return Futures.immediateFuture(existing);
348         }
349
350         return Futures.transform(getSchemaFromDevice(Collections.singletonList(database)), result -> {
351             DatabaseSchema dbSchema = result.get(database);
352             if (dbSchema == null) {
353                 return null;
354             }
355
356             dbSchema = dbSchema.withInternallyGeneratedColumns();
357             final DatabaseSchema raced = schemas.putIfAbsent(database, dbSchema);
358             return raced != null ? raced : dbSchema;
359         }, executorService);
360     }
361
362     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
363         Map<String, DatabaseSchema> schema = new HashMap<>();
364         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
365         populateSchema(dbNames, schema, future);
366         return future;
367     }
368
369     private void populateSchema(final List<String> dbNames,
370                                  final Map<String, DatabaseSchema> schema,
371                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
372
373         if (dbNames == null || dbNames.isEmpty()) {
374             return;
375         }
376
377         Futures.transform(rpc.get_schema(Collections.singletonList(dbNames.get(0))), jsonNode -> {
378             try {
379                 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
380                 if (schema.size() > 1 && !sfuture.isCancelled()) {
381                     populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
382                 } else if (schema.size() == 1) {
383                     sfuture.set(schema);
384                 }
385             } catch (ParsingException e) {
386                 LOG.warn("Failed to populate schema {}:{}", dbNames, schema, e);
387                 sfuture.setException(e);
388             }
389             return null;
390         }, MoreExecutors.directExecutor());
391     }
392
393     public void setRpc(final OvsdbRPC rpc) {
394         this.rpc = rpc;
395     }
396
397     static class CallbackContext {
398         MonitorCallBack monitorCallBack;
399         DatabaseSchema schema;
400
401         CallbackContext(final MonitorCallBack monitorCallBack, final DatabaseSchema schema) {
402             this.monitorCallBack = monitorCallBack;
403             this.schema = schema;
404         }
405     }
406
407     @Override
408     public DatabaseSchema getDatabaseSchema(final String dbName) {
409         return schemas.get(dbName);
410     }
411
412     /**
413      * This method finds the DatabaseSchema that matches a given Typed Table Class.
414      * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
415      * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
416      *
417      * @param klazz Typed Class that represents a Table
418      * @return DatabaseSchema that matches a Typed Table Class
419      */
420     private <T> DatabaseSchema getDatabaseSchemaForTypedTable(final Class<T> klazz) {
421         TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
422         if (typedTable != null) {
423             return this.getDatabaseSchema(typedTable.database());
424         }
425         return null;
426     }
427
428     /**
429      * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
430      * given the Typed Table Class.
431      *
432      * @param klazz Typed Interface
433      * @return Proxy wrapper for the actual raw Row class.
434      */
435     @Override
436     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final Class<T> klazz) {
437         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
438         return this.createTypedRowWrapper(dbSchema, klazz);
439     }
440
441     /**
442      * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
443      * DatabaseSchema and Typed Table Class.
444      *
445      * @param dbSchema Database Schema of interest
446      * @param klazz Typed Interface
447      * @return Proxy wrapper for the actual raw Row class.
448      */
449     @Override
450     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final DatabaseSchema dbSchema, final Class<T> klazz) {
451         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<>());
452     }
453
454     /**
455      * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
456      *
457      * @param klazz Typed Interface
458      * @param row The actual Row that the wrapper is operating on.
459      *            It can be null if the caller is just interested in getting ColumnSchema.
460      * @return Proxy wrapper for the actual raw Row class.
461      */
462     @Override
463
464     public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
465         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
466         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
467     }
468
469     @Override
470     public OvsdbConnectionInfo getConnectionInfo() {
471         return connectionInfo;
472     }
473
474     @Override
475     public boolean isActive() {
476         return channel.isActive();
477     }
478
479     @Override
480     public void disconnect() {
481         channel.disconnect();
482         executorService.shutdown();
483     }
484
485     @Override
486     public boolean isConnectionPublished() {
487         return isConnectionPublished;
488     }
489
490     @Override
491     public void setConnectionPublished(final boolean connectionPublished) {
492         isConnectionPublished = connectionPublished;
493     }
494 }