2 * Copyright © 2014, 2017 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.lib.impl;
10 import com.fasterxml.jackson.databind.JsonNode;
11 import com.fasterxml.jackson.databind.node.ObjectNode;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import io.netty.channel.Channel;
20 import java.util.Arrays;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.UUID;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
34 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
35 import org.opendaylight.ovsdb.lib.LockStolenCallback;
36 import org.opendaylight.ovsdb.lib.MonitorCallBack;
37 import org.opendaylight.ovsdb.lib.MonitorHandle;
38 import org.opendaylight.ovsdb.lib.OvsdbClient;
39 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
40 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
41 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
42 import org.opendaylight.ovsdb.lib.error.ParsingException;
43 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
44 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
45 import org.opendaylight.ovsdb.lib.message.TableUpdate;
46 import org.opendaylight.ovsdb.lib.message.TableUpdates;
47 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
48 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
49 import org.opendaylight.ovsdb.lib.notation.Row;
50 import org.opendaylight.ovsdb.lib.operations.Operation;
51 import org.opendaylight.ovsdb.lib.operations.OperationResult;
52 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
53 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
54 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
55 import org.opendaylight.ovsdb.lib.schema.TableSchema;
56 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
57 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
58 import org.opendaylight.ovsdb.lib.schema.typed.TypedReflections;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 public class OvsdbClientImpl implements OvsdbClient {
64 private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
65 private ExecutorService executorService;
67 private final Map<String, TypedDatabaseSchema> schemas = new HashMap<>();
68 private final Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
69 private OvsdbRPC.Callback rpcCallback;
70 private OvsdbConnectionInfo connectionInfo;
71 private Channel channel;
72 private boolean isConnectionPublished;
73 private static final int NO_TIMEOUT = -1;
75 private static final ThreadFactory THREAD_FACTORY_SSL =
76 new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
77 private static final ThreadFactory THREAD_FACTORY_NON_SSL =
78 new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
80 public OvsdbClientImpl(final OvsdbRPC rpc, final Channel channel, final ConnectionType type,
81 final SocketConnectionType socketConnType) {
83 ThreadFactory threadFactory =
84 getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
85 this.executorService = Executors.newCachedThreadPool(threadFactory);
86 this.channel = channel;
87 this.connectionInfo = new OvsdbConnectionInfo(channel, type);
91 * Generate the threadFactory based on ACTIVE, PASSIVE (SSL/NON-SSL) connection type.
92 * @param type ACTIVE or PASSIVE {@link ConnectionType}
93 * @param socketConnType SSL or NON-SSL {@link SocketConnectionType}
94 * @param executorNameArgs Additional args to append to thread name format
95 * @return {@link ThreadFactory}
97 private ThreadFactory getThreadFactory(final ConnectionType type,
98 final SocketConnectionType socketConnType, final String executorNameArgs) {
99 if (type == ConnectionType.PASSIVE) {
100 switch (socketConnType) {
102 return THREAD_FACTORY_SSL;
104 return THREAD_FACTORY_NON_SSL;
106 return Executors.defaultThreadFactory();
108 } else if (type == ConnectionType.ACTIVE) {
109 ThreadFactory threadFactorySSL =
110 new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConn-" + executorNameArgs + "-%d")
112 return threadFactorySSL;
115 return Executors.defaultThreadFactory();
121 void setupUpdateListener() {
122 if (rpcCallback == null) {
123 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
125 public void update(final Object node, final UpdateNotification updateNotification) {
126 String key = updateNotification.getContext();
127 CallbackContext callbackContext = monitorCallbacks.get(key);
128 MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
129 if (monitorCallBack == null) {
131 LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
134 TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
135 callbackContext.schema);
136 monitorCallBack.update(updates, callbackContext.schema);
140 public void locked(final Object node, final List<String> ids) {
145 public void stolen(final Object node, final List<String> ids) {
149 this.rpcCallback = temp;
150 rpc.registerCallback(temp);
155 protected TableUpdates transformingCallback(final JsonNode tableUpdatesJson, final DatabaseSchema dbSchema) {
156 //todo(ashwin): we should move all the JSON parsing logic to a utility class
157 if (tableUpdatesJson instanceof ObjectNode) {
158 Map<String, TableUpdate> tableUpdateMap = new HashMap<>();
159 ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
160 for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
161 Map.Entry<String, JsonNode> entry = itr.next();
163 DatabaseSchema databaseSchema = this.schemas.get(dbSchema.getName());
164 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
165 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
168 return new TableUpdates(tableUpdateMap);
174 public ListenableFuture<List<OperationResult>> transact(final DatabaseSchema dbSchema,
175 final List<Operation> operations) {
177 //todo, we may not need transactionbuilder if we can have JSON objects
178 TransactBuilder builder = new TransactBuilder(dbSchema);
179 for (Operation operation : operations) {
180 builder.addOperation(operation);
183 return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
187 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
188 final List<MonitorRequest> monitorRequest,
189 final MonitorCallBack callback) {
190 return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
194 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
195 final List<MonitorRequest> monitorRequest,
196 final MonitorCallBack callback,
198 return monitor(dbSchema, monitorRequest, new MonitorHandle(UUID.randomUUID().toString()), callback, timeout);
202 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
203 final List<MonitorRequest> monitorRequest,
204 final MonitorHandle monitorHandle,
205 final MonitorCallBack callback) {
206 return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
210 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
211 final List<MonitorRequest> monitorRequest,
212 final MonitorHandle monitorHandle,
213 final MonitorCallBack callback,
215 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
216 MonitorRequest::getTableName);
218 registerCallback(monitorHandle, callback, dbSchema);
220 final ListenableFuture<JsonNode> monitor = rpc.monitor(
221 () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
222 final JsonNode result;
224 if (timeout == NO_TIMEOUT) {
225 result = monitor.get();
227 result = monitor.get(timeout, TimeUnit.SECONDS);
229 } catch (InterruptedException | ExecutionException | TimeoutException e) {
230 LOG.warn("Failed to monitor {}", dbSchema, e);
233 return transformingCallback(result, dbSchema);
236 private void registerCallback(final MonitorHandle monitorHandle, final MonitorCallBack callback,
237 final DatabaseSchema schema) {
238 this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
239 setupUpdateListener();
243 public void cancelMonitor(final MonitorHandle handler) {
244 cancelMonitor(handler, NO_TIMEOUT);
248 public void cancelMonitor(final MonitorHandle handler, final int timeout) {
249 ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
251 JsonNode result = null;
253 if (timeout == NO_TIMEOUT) {
254 result = cancelMonitor.get();
256 result = cancelMonitor.get(timeout, TimeUnit.SECONDS);
258 } catch (InterruptedException | ExecutionException | TimeoutException e) {
259 LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
262 if (result == null) {
263 LOG.error("Fail to cancel monitor with handler {}", handler.getId());
265 LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
270 public ListenableFuture<List<String>> echo() {
275 public void lock(final String lockId, final LockAquisitionCallback lockedCallBack,
276 final LockStolenCallback stolenCallback) {
277 throw new UnsupportedOperationException("not yet implemented");
281 public ListenableFuture<Boolean> steal(final String lockId) {
282 throw new UnsupportedOperationException("not yet implemented");
286 public ListenableFuture<Boolean> unLock(final String lockId) {
287 throw new UnsupportedOperationException("not yet implemented");
291 public void startEchoService(final EchoServiceCallbackFilters callbackFilters) {
292 throw new UnsupportedOperationException("not yet implemented");
296 public void stopEchoService() {
297 throw new UnsupportedOperationException("not yet implemented");
301 public TransactionBuilder transactBuilder(final DatabaseSchema dbSchema) {
302 return new TransactionBuilder(this, dbSchema);
306 public boolean isReady(int timeout) throws InterruptedException {
307 while (timeout > 0) {
308 if (!schemas.isEmpty()) {
318 public ListenableFuture<List<String>> getDatabases() {
319 return rpc.list_dbs();
323 public ListenableFuture<DatabaseSchema> getSchema(final String database) {
324 final TypedDatabaseSchema existing = schemas.get(database);
325 if (existing != null) {
326 return Futures.immediateFuture(existing);
329 return Futures.transform(getSchemaFromDevice(Collections.singletonList(database)), result -> {
330 final DatabaseSchema dbSchema = result.get(database);
331 if (dbSchema == null) {
335 final TypedDatabaseSchema typedSchema = TypedDatabaseSchema.of(dbSchema.withInternallyGeneratedColumns());
336 final TypedDatabaseSchema raced = schemas.putIfAbsent(database, typedSchema);
337 return raced != null ? raced : typedSchema;
341 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
342 Map<String, DatabaseSchema> schema = new HashMap<>();
343 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
344 populateSchema(dbNames, schema, future);
348 private void populateSchema(final List<String> dbNames,
349 final Map<String, DatabaseSchema> schema,
350 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
352 if (dbNames == null || dbNames.isEmpty()) {
356 Futures.transform(rpc.get_schema(Collections.singletonList(dbNames.get(0))), jsonNode -> {
358 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
359 if (schema.size() > 1 && !sfuture.isCancelled()) {
360 populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
361 } else if (schema.size() == 1) {
364 } catch (ParsingException e) {
365 LOG.warn("Failed to populate schema {}:{}", dbNames, schema, e);
366 sfuture.setException(e);
369 }, MoreExecutors.directExecutor());
372 public void setRpc(final OvsdbRPC rpc) {
376 static class CallbackContext {
377 MonitorCallBack monitorCallBack;
378 DatabaseSchema schema;
380 CallbackContext(final MonitorCallBack monitorCallBack, final DatabaseSchema schema) {
381 this.monitorCallBack = monitorCallBack;
382 this.schema = schema;
387 public TypedDatabaseSchema getDatabaseSchema(final String dbName) {
388 return schemas.get(dbName);
392 * This method finds the DatabaseSchema that matches a given Typed Table Class.
393 * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
394 * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
396 * @param klazz Typed Class that represents a Table
397 * @return DatabaseSchema that matches a Typed Table Class
399 private <T> TypedDatabaseSchema getDatabaseSchemaForTypedTable(final Class<T> klazz) {
400 final String dbName = TypedReflections.getTableDatabase(klazz);
401 return dbName == null ? null : getDatabaseSchema(dbName);
405 * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
406 * given the Typed Table Class.
408 * @param klazz Typed Interface
409 * @return Proxy wrapper for the actual raw Row class.
412 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final Class<T> klazz) {
413 return getTypedRowWrapper(klazz, new Row<>());
417 * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
418 * DatabaseSchema and Typed Table Class.
420 * @param dbSchema Database Schema of interest
421 * @param klazz Typed Interface
422 * @return Proxy wrapper for the actual raw Row class.
425 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final DatabaseSchema dbSchema, final Class<T> klazz) {
426 return dbSchema == null ? null : TypedDatabaseSchema.of(dbSchema).getTypedRowWrapper(klazz, new Row<>());
430 * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
432 * @param klazz Typed Interface
433 * @param row The actual Row that the wrapper is operating on.
434 * It can be null if the caller is just interested in getting ColumnSchema.
435 * @return Proxy wrapper for the actual raw Row class.
439 public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
440 final TypedDatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
441 return dbSchema == null ? null : dbSchema.getTypedRowWrapper(klazz, row);
445 public OvsdbConnectionInfo getConnectionInfo() {
446 return connectionInfo;
450 public boolean isActive() {
451 return channel.isActive();
455 public void disconnect() {
456 channel.disconnect();
457 executorService.shutdown();
461 public boolean isConnectionPublished() {
462 return isConnectionPublished;
466 public void setConnectionPublished(final boolean connectionPublished) {
467 isConnectionPublished = connectionPublished;