2 * Copyright (c) 2014, 2015 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.lib.impl;
11 import com.fasterxml.jackson.databind.JsonNode;
12 import com.fasterxml.jackson.databind.node.ObjectNode;
13 import com.google.common.base.Function;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Lists;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import io.netty.channel.Channel;
22 import java.util.Iterator;
23 import java.util.List;
25 import java.util.UUID;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadFactory;
30 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
31 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
32 import org.opendaylight.ovsdb.lib.LockStolenCallback;
33 import org.opendaylight.ovsdb.lib.MonitorCallBack;
34 import org.opendaylight.ovsdb.lib.MonitorHandle;
35 import org.opendaylight.ovsdb.lib.OvsdbClient;
36 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
37 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
38 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
39 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
40 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
41 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
42 import org.opendaylight.ovsdb.lib.message.TableUpdate;
43 import org.opendaylight.ovsdb.lib.message.TableUpdates;
44 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
45 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
46 import org.opendaylight.ovsdb.lib.notation.Row;
47 import org.opendaylight.ovsdb.lib.operations.Operation;
48 import org.opendaylight.ovsdb.lib.operations.OperationResult;
49 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
50 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
51 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
52 import org.opendaylight.ovsdb.lib.schema.TableSchema;
53 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
54 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
55 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
60 public class OvsdbClientImpl implements OvsdbClient {
62 private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
63 private ExecutorService executorService;
65 private Map<String, DatabaseSchema> schema = Maps.newHashMap();
66 private Map<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
67 private OvsdbRPC.Callback rpcCallback;
68 private OvsdbConnectionInfo connectionInfo;
69 private Channel channel;
70 private static final ThreadFactory threadFactorySSL =
71 new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
72 private static final ThreadFactory threadFactoryNonSSL =
73 new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
75 public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type,
76 SocketConnectionType socketConnType) {
78 ThreadFactory threadFactory =
79 getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
80 this.executorService = Executors.newCachedThreadPool(threadFactory);
81 this.channel = channel;
82 this.connectionInfo = new OvsdbConnectionInfo(channel, type);
86 * Genereate the threadFactory based on ACTIVE, PASSIVE (SSL/NON-SSL) connection type.
87 * @param type ACTIVE or PASSIVE {@link ConnectionType}
88 * @param socketConnType SSL or NON-SSL {@link SocketConnectionType}
89 * @param executorNameArgs Additional args to append to thread name format
90 * @return {@link ThreadFactory}
92 private ThreadFactory getThreadFactory(ConnectionType type,
93 SocketConnectionType socketConnType, String executorNameArgs) {
94 if (type == ConnectionType.PASSIVE) {
95 switch (socketConnType) {
97 return threadFactorySSL;
99 return threadFactoryNonSSL;
101 return Executors.defaultThreadFactory();
103 } else if (type == ConnectionType.ACTIVE) {
104 ThreadFactory threadFactorySSL =
105 new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConnection-" + executorNameArgs + "-%d")
107 return threadFactorySSL;
110 return Executors.defaultThreadFactory();
116 void setupUpdateListener() {
117 if (rpcCallback == null) {
118 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
120 public void update(Object node, UpdateNotification updateNotification) {
121 Object key = updateNotification.getContext();
122 CallbackContext callbackContext = monitorCallbacks.get(key);
123 MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
124 if (monitorCallBack == null) {
126 LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
129 TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
130 callbackContext.schema);
131 monitorCallBack.update(updates, callbackContext.schema);
135 public void locked(Object node, List<String> ids) {
140 public void stolen(Object node, List<String> ids) {
144 this.rpcCallback = temp;
145 rpc.registerCallback(temp);
150 protected TableUpdates transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
151 //todo(ashwin): we should move all the JSON parsing logic to a utility class
152 if (tableUpdatesJson instanceof ObjectNode) {
153 Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
154 ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
155 for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
156 Map.Entry<String, JsonNode> entry = itr.next();
158 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
159 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
160 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
163 return new TableUpdates(tableUpdateMap);
169 public ListenableFuture<List<OperationResult>> transact(DatabaseSchema dbSchema, List<Operation> operations) {
171 //todo, we may not need transactionbuilder if we can have JSON objects
172 TransactBuilder builder = new TransactBuilder(dbSchema);
173 for (Operation operation : operations) {
174 builder.addOperation(operation);
177 return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
181 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
182 List<MonitorRequest> monitorRequest,
183 final MonitorCallBack callback) {
185 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
186 new Function<MonitorRequest, String>() {
188 public String apply(MonitorRequest input) {
189 return input.getTableName();
193 final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
194 registerCallback(monitorHandle, callback, dbSchema);
196 ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
198 public List<Object> params() {
199 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
204 result = monitor.get();
205 } catch (InterruptedException | ExecutionException e) {
206 LOG.warn("Failed to monitor {}", dbSchema, e);
209 return transformingCallback(result, dbSchema);
213 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
214 List<MonitorRequest> monitorRequest,
215 final MonitorHandle monitorHandle,
216 final MonitorCallBack callback) {
218 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
219 new Function<MonitorRequest, String>() {
221 public String apply(MonitorRequest input) {
222 return input.getTableName();
226 registerCallback(monitorHandle, callback, dbSchema);
228 ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
230 public List<Object> params() {
231 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
236 result = monitor.get();
237 } catch (InterruptedException | ExecutionException e) {
238 LOG.warn("Failed to monitor {}", dbSchema, e);
241 return transformingCallback(result, dbSchema);
244 private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
245 this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
246 setupUpdateListener();
250 public void cancelMonitor(final MonitorHandle handler) {
251 ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(new Params() {
253 public List<Object> params() {
254 return Lists.<Object>newArrayList(handler.getId());
258 JsonNode result = null;
260 result = cancelMonitor.get();
261 } catch (InterruptedException | ExecutionException e) {
262 LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
265 if (result == null) {
266 LOG.error("Fail to cancel monitor with handler {}", handler.getId());
268 LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
273 public ListenableFuture<List<String>> echo() {
278 public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
279 throw new UnsupportedOperationException("not yet implemented");
283 public ListenableFuture<Boolean> steal(String lockId) {
284 throw new UnsupportedOperationException("not yet implemented");
288 public ListenableFuture<Boolean> unLock(String lockId) {
289 throw new UnsupportedOperationException("not yet implemented");
293 public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
294 throw new UnsupportedOperationException("not yet implemented");
298 public void stopEchoService() {
299 throw new UnsupportedOperationException("not yet implemented");
303 public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) {
304 return new TransactionBuilder(this, dbSchema);
308 public boolean isReady(int timeout) throws InterruptedException {
309 while (timeout > 0) {
310 if (!schema.isEmpty()) {
320 public ListenableFuture<List<String>> getDatabases() {
321 return rpc.list_dbs();
325 public ListenableFuture<DatabaseSchema> getSchema(final String database) {
327 DatabaseSchema databaseSchema = schema.get(database);
329 if (databaseSchema == null) {
330 return Futures.transform(
331 getSchemaFromDevice(Lists.newArrayList(database)),
332 new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
334 public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
335 if (result.containsKey(database)) {
336 DatabaseSchema dbSchema = result.get(database);
337 dbSchema.populateInternallyGeneratedColumns();
338 OvsdbClientImpl.this.schema.put(database, dbSchema);
346 return Futures.immediateFuture(databaseSchema);
350 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
351 Map<String, DatabaseSchema> schema = Maps.newHashMap();
352 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
353 populateSchema(dbNames, schema, future);
357 private void populateSchema(final List<String> dbNames,
358 final Map<String, DatabaseSchema> schema,
359 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
361 if (dbNames == null || dbNames.isEmpty()) {
365 Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
366 new com.google.common.base.Function<JsonNode, Void>() {
368 public Void apply(JsonNode jsonNode) {
370 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
371 if (schema.size() > 1 && !sfuture.isCancelled()) {
372 populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
373 } else if (schema.size() == 1) {
376 } catch (Exception e) {
377 LOG.warn("Failed to populate schema {}:{}", dbNames, schema, e);
378 sfuture.setException(e);
385 public void setRpc(OvsdbRPC rpc) {
389 static class CallbackContext {
390 MonitorCallBack monitorCallBack;
391 DatabaseSchema schema;
393 CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
394 this.monitorCallBack = monitorCallBack;
395 this.schema = schema;
400 public DatabaseSchema getDatabaseSchema(String dbName) {
401 return schema.get(dbName);
405 * This method finds the DatabaseSchema that matches a given Typed Table Class.
406 * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
407 * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
409 * @param klazz Typed Class that represents a Table
410 * @return DatabaseSchema that matches a Typed Table Class
412 private <T> DatabaseSchema getDatabaseSchemaForTypedTable(Class<T> klazz) {
413 TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
414 if (typedTable != null) {
415 return this.getDatabaseSchema(typedTable.database());
421 * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
422 * given the Typed Table Class.
424 * @param klazz Typed Interface
425 * @return Proxy wrapper for the actual raw Row class.
428 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(Class<T> klazz) {
429 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
430 return this.createTypedRowWrapper(dbSchema, klazz);
434 * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
435 * DatabaseSchema and Typed Table Class.
437 * @param dbSchema Database Schema of interest
438 * @param klazz Typed Interface
439 * @return Proxy wrapper for the actual raw Row class.
442 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
443 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<GenericTableSchema>());
447 * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
449 * @param klazz Typed Interface
450 * @param row The actual Row that the wrapper is operating on.
451 * It can be null if the caller is just interested in getting ColumnSchema.
452 * @return Proxy wrapper for the actual raw Row class.
456 public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
457 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
458 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
462 public OvsdbConnectionInfo getConnectionInfo() {
463 return connectionInfo;
467 public boolean isActive() {
468 return channel.isActive();
472 public void disconnect() {
473 channel.disconnect();
474 executorService.shutdown();