Fix ALOTOF Checkstyle violation, and switch over to enforcement.
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbClientImpl.java
1 /*
2  * Copyright (c) 2014, 2015 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.lib.impl;
10
11 import com.fasterxml.jackson.databind.JsonNode;
12 import com.fasterxml.jackson.databind.node.ObjectNode;
13 import com.google.common.base.Function;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Lists;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import io.netty.channel.Channel;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.UUID;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadFactory;
30 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
31 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
32 import org.opendaylight.ovsdb.lib.LockStolenCallback;
33 import org.opendaylight.ovsdb.lib.MonitorCallBack;
34 import org.opendaylight.ovsdb.lib.MonitorHandle;
35 import org.opendaylight.ovsdb.lib.OvsdbClient;
36 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
37 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
38 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
39 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
40 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
41 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
42 import org.opendaylight.ovsdb.lib.message.TableUpdate;
43 import org.opendaylight.ovsdb.lib.message.TableUpdates;
44 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
45 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
46 import org.opendaylight.ovsdb.lib.notation.Row;
47 import org.opendaylight.ovsdb.lib.operations.Operation;
48 import org.opendaylight.ovsdb.lib.operations.OperationResult;
49 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
50 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
51 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
52 import org.opendaylight.ovsdb.lib.schema.TableSchema;
53 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
54 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
55 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59
60 public class OvsdbClientImpl implements OvsdbClient {
61
62     private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
63     private ExecutorService executorService;
64     private OvsdbRPC rpc;
65     private Map<String, DatabaseSchema> schema = Maps.newHashMap();
66     private Map<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
67     private OvsdbRPC.Callback rpcCallback;
68     private OvsdbConnectionInfo connectionInfo;
69     private Channel channel;
70     private static final ThreadFactory threadFactorySSL =
71         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
72     private static final ThreadFactory threadFactoryNonSSL =
73         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
74
75     public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type,
76         SocketConnectionType socketConnType) {
77         this.rpc = rpc;
78         ThreadFactory threadFactory =
79             getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
80         this.executorService = Executors.newCachedThreadPool(threadFactory);
81         this.channel = channel;
82         this.connectionInfo = new OvsdbConnectionInfo(channel, type);
83     }
84
85     /**
86      * Genereate the threadFactory based on ACTIVE, PASSIVE (SSL/NON-SSL) connection type.
87      * @param type ACTIVE or PASSIVE {@link ConnectionType}
88      * @param socketConnType SSL or NON-SSL {@link SocketConnectionType}
89      * @param executorNameArgs Additional args to append to thread name format
90      * @return {@link ThreadFactory}
91      */
92     private ThreadFactory getThreadFactory(ConnectionType type,
93         SocketConnectionType socketConnType, String executorNameArgs) {
94         if (type == ConnectionType.PASSIVE) {
95             switch (socketConnType) {
96                 case SSL:
97                     return threadFactorySSL;
98                 case NON_SSL:
99                     return threadFactoryNonSSL;
100                 default:
101                     return Executors.defaultThreadFactory();
102             }
103         } else if (type == ConnectionType.ACTIVE) {
104             ThreadFactory threadFactorySSL =
105                 new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConnection-" + executorNameArgs + "-%d")
106                     .build();
107             return threadFactorySSL;
108         }
109         // Default case
110         return Executors.defaultThreadFactory();
111     }
112
113     OvsdbClientImpl() {
114     }
115
116     void setupUpdateListener() {
117         if (rpcCallback == null) {
118             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
119                 @Override
120                 public void update(Object node, UpdateNotification updateNotification) {
121                     Object key = updateNotification.getContext();
122                     CallbackContext callbackContext = monitorCallbacks.get(key);
123                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
124                     if (monitorCallBack == null) {
125                         //ignore ?
126                         LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
127                         return;
128                     }
129                     TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
130                             callbackContext.schema);
131                     monitorCallBack.update(updates, callbackContext.schema);
132                 }
133
134                 @Override
135                 public void locked(Object node, List<String> ids) {
136
137                 }
138
139                 @Override
140                 public void stolen(Object node, List<String> ids) {
141
142                 }
143             };
144             this.rpcCallback = temp;
145             rpc.registerCallback(temp);
146         }
147     }
148
149
150     protected TableUpdates transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
151         //todo(ashwin): we should move all the JSON parsing logic to a utility class
152         if (tableUpdatesJson instanceof ObjectNode) {
153             Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
154             ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
155             for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
156                 Map.Entry<String, JsonNode> entry = itr.next();
157
158                 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
159                 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
160                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
161
162             }
163             return new TableUpdates(tableUpdateMap);
164         }
165         return null;
166     }
167
168     @Override
169     public ListenableFuture<List<OperationResult>> transact(DatabaseSchema dbSchema, List<Operation> operations) {
170
171         //todo, we may not need transactionbuilder if we can have JSON objects
172         TransactBuilder builder = new TransactBuilder(dbSchema);
173         for (Operation operation : operations) {
174             builder.addOperation(operation);
175         }
176
177         return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
178     }
179
180     @Override
181     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
182                                                             List<MonitorRequest> monitorRequest,
183                                                             final MonitorCallBack callback) {
184
185         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
186                 new Function<MonitorRequest, String>() {
187                     @Override
188                     public String apply(MonitorRequest input) {
189                         return input.getTableName();
190                     }
191                 });
192
193         final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
194         registerCallback(monitorHandle, callback, dbSchema);
195
196         ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
197             @Override
198             public List<Object> params() {
199                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
200             }
201         });
202         JsonNode result;
203         try {
204             result = monitor.get();
205         } catch (InterruptedException | ExecutionException e) {
206             LOG.warn("Failed to monitor {}", dbSchema, e);
207             return null;
208         }
209         return transformingCallback(result, dbSchema);
210     }
211
212     @Override
213     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
214                                                            List<MonitorRequest> monitorRequest,
215                                                            final MonitorHandle monitorHandle,
216                                                            final MonitorCallBack callback) {
217
218         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
219                 new Function<MonitorRequest, String>() {
220                     @Override
221                     public String apply(MonitorRequest input) {
222                         return input.getTableName();
223                     }
224                 });
225
226         registerCallback(monitorHandle, callback, dbSchema);
227
228         ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
229             @Override
230             public List<Object> params() {
231                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
232             }
233         });
234         JsonNode result;
235         try {
236             result = monitor.get();
237         } catch (InterruptedException | ExecutionException e) {
238             LOG.warn("Failed to monitor {}", dbSchema, e);
239             return null;
240         }
241         return transformingCallback(result, dbSchema);
242     }
243
244     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
245         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
246         setupUpdateListener();
247     }
248
249     @Override
250     public void cancelMonitor(final MonitorHandle handler) {
251         ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(new Params() {
252             @Override
253             public List<Object> params() {
254                 return Lists.<Object>newArrayList(handler.getId());
255             }
256         });
257
258         JsonNode result = null;
259         try {
260             result = cancelMonitor.get();
261         } catch (InterruptedException | ExecutionException e) {
262             LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
263         }
264
265         if (result == null) {
266             LOG.error("Fail to cancel monitor with handler {}", handler.getId());
267         } else {
268             LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
269         }
270     }
271
272     @Override
273     public ListenableFuture<List<String>> echo() {
274         return rpc.echo();
275     }
276
277     @Override
278     public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
279         throw new UnsupportedOperationException("not yet implemented");
280     }
281
282     @Override
283     public ListenableFuture<Boolean> steal(String lockId) {
284         throw new UnsupportedOperationException("not yet implemented");
285     }
286
287     @Override
288     public ListenableFuture<Boolean> unLock(String lockId) {
289         throw new UnsupportedOperationException("not yet implemented");
290     }
291
292     @Override
293     public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
294         throw new UnsupportedOperationException("not yet implemented");
295     }
296
297     @Override
298     public void stopEchoService() {
299         throw new UnsupportedOperationException("not yet implemented");
300     }
301
302     @Override
303     public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) {
304         return new TransactionBuilder(this, dbSchema);
305     }
306
307
308     public boolean isReady(int timeout) throws InterruptedException {
309         while (timeout > 0) {
310             if (!schema.isEmpty()) {
311                 return true;
312             }
313             Thread.sleep(1000);
314             timeout--;
315         }
316         return false;
317     }
318
319     @Override
320     public ListenableFuture<List<String>> getDatabases() {
321         return rpc.list_dbs();
322     }
323
324     @Override
325     public ListenableFuture<DatabaseSchema> getSchema(final String database) {
326
327         DatabaseSchema databaseSchema = schema.get(database);
328
329         if (databaseSchema == null) {
330             return Futures.transform(
331                     getSchemaFromDevice(Lists.newArrayList(database)),
332                     new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
333                         @Override
334                         public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
335                             if (result.containsKey(database)) {
336                                 DatabaseSchema dbSchema = result.get(database);
337                                 dbSchema.populateInternallyGeneratedColumns();
338                                 OvsdbClientImpl.this.schema.put(database, dbSchema);
339                                 return dbSchema;
340                             } else {
341                                 return null;
342                             }
343                         }
344                     }, executorService);
345         } else {
346             return Futures.immediateFuture(databaseSchema);
347         }
348     }
349
350     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
351         Map<String, DatabaseSchema> schema = Maps.newHashMap();
352         SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
353         populateSchema(dbNames, schema, future);
354         return future;
355     }
356
357     private void populateSchema(final List<String> dbNames,
358                                  final Map<String, DatabaseSchema> schema,
359                                  final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
360
361         if (dbNames == null || dbNames.isEmpty()) {
362             return;
363         }
364
365         Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
366                 new com.google.common.base.Function<JsonNode, Void>() {
367                     @Override
368                     public Void apply(JsonNode jsonNode) {
369                         try {
370                             schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
371                             if (schema.size() > 1 && !sfuture.isCancelled()) {
372                                 populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
373                             } else if (schema.size() == 1) {
374                                 sfuture.set(schema);
375                             }
376                         } catch (Exception e) {
377                             LOG.warn("Failed to populate schema {}:{}", dbNames, schema, e);
378                             sfuture.setException(e);
379                         }
380                         return null;
381                     }
382                 });
383     }
384
385     public void setRpc(OvsdbRPC rpc) {
386         this.rpc = rpc;
387     }
388
389     static class CallbackContext {
390         MonitorCallBack monitorCallBack;
391         DatabaseSchema schema;
392
393         CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
394             this.monitorCallBack = monitorCallBack;
395             this.schema = schema;
396         }
397     }
398
399     @Override
400     public DatabaseSchema getDatabaseSchema(String dbName) {
401         return schema.get(dbName);
402     }
403
404     /**
405      * This method finds the DatabaseSchema that matches a given Typed Table Class.
406      * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
407      * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
408      *
409      * @param klazz Typed Class that represents a Table
410      * @return DatabaseSchema that matches a Typed Table Class
411      */
412     private <T> DatabaseSchema getDatabaseSchemaForTypedTable(Class<T> klazz) {
413         TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
414         if (typedTable != null) {
415             return this.getDatabaseSchema(typedTable.database());
416         }
417         return null;
418     }
419
420     /**
421      * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
422      * given the Typed Table Class.
423      *
424      * @param klazz Typed Interface
425      * @return Proxy wrapper for the actual raw Row class.
426      */
427     @Override
428     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(Class<T> klazz) {
429         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
430         return this.createTypedRowWrapper(dbSchema, klazz);
431     }
432
433     /**
434      * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
435      * DatabaseSchema and Typed Table Class.
436      *
437      * @param dbSchema Database Schema of interest
438      * @param klazz Typed Interface
439      * @return Proxy wrapper for the actual raw Row class.
440      */
441     @Override
442     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
443         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<GenericTableSchema>());
444     }
445
446     /**
447      * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
448      *
449      * @param klazz Typed Interface
450      * @param row The actual Row that the wrapper is operating on.
451      *            It can be null if the caller is just interested in getting ColumnSchema.
452      * @return Proxy wrapper for the actual raw Row class.
453      */
454     @Override
455
456     public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
457         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
458         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
459     }
460
461     @Override
462     public OvsdbConnectionInfo getConnectionInfo() {
463         return connectionInfo;
464     }
465
466     @Override
467     public boolean isActive() {
468         return channel.isActive();
469     }
470
471     @Override
472     public void disconnect() {
473         channel.disconnect();
474         executorService.shutdown();
475     }
476 }