2 * Copyright © 2014, 2017 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.lib.impl;
10 import com.fasterxml.jackson.databind.JsonNode;
11 import com.fasterxml.jackson.databind.node.ObjectNode;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import io.netty.channel.Channel;
20 import java.util.Arrays;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.UUID;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
34 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
35 import org.opendaylight.ovsdb.lib.LockStolenCallback;
36 import org.opendaylight.ovsdb.lib.MonitorCallBack;
37 import org.opendaylight.ovsdb.lib.MonitorHandle;
38 import org.opendaylight.ovsdb.lib.OvsdbClient;
39 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
40 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
41 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
42 import org.opendaylight.ovsdb.lib.error.ParsingException;
43 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
44 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
45 import org.opendaylight.ovsdb.lib.message.TableUpdate;
46 import org.opendaylight.ovsdb.lib.message.TableUpdates;
47 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
48 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
49 import org.opendaylight.ovsdb.lib.notation.Row;
50 import org.opendaylight.ovsdb.lib.operations.Operation;
51 import org.opendaylight.ovsdb.lib.operations.OperationResult;
52 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
53 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
54 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
55 import org.opendaylight.ovsdb.lib.schema.TableSchema;
56 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
57 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
58 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 public class OvsdbClientImpl implements OvsdbClient {
64 private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
65 private ExecutorService executorService;
67 private final Map<String, DatabaseSchema> schemas = new HashMap<>();
68 private final Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
69 private OvsdbRPC.Callback rpcCallback;
70 private OvsdbConnectionInfo connectionInfo;
71 private Channel channel;
72 private boolean isConnectionPublished;
73 private static final int NO_TIMEOUT = -1;
75 private static final ThreadFactory THREAD_FACTORY_SSL =
76 new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
77 private static final ThreadFactory THREAD_FACTORY_NON_SSL =
78 new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
80 public OvsdbClientImpl(final OvsdbRPC rpc, final Channel channel, final ConnectionType type,
81 final SocketConnectionType socketConnType) {
83 ThreadFactory threadFactory =
84 getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
85 this.executorService = Executors.newCachedThreadPool(threadFactory);
86 this.channel = channel;
87 this.connectionInfo = new OvsdbConnectionInfo(channel, type);
91 * Generate the threadFactory based on ACTIVE, PASSIVE (SSL/NON-SSL) connection type.
92 * @param type ACTIVE or PASSIVE {@link ConnectionType}
93 * @param socketConnType SSL or NON-SSL {@link SocketConnectionType}
94 * @param executorNameArgs Additional args to append to thread name format
95 * @return {@link ThreadFactory}
97 private ThreadFactory getThreadFactory(final ConnectionType type,
98 final SocketConnectionType socketConnType, final String executorNameArgs) {
99 if (type == ConnectionType.PASSIVE) {
100 switch (socketConnType) {
102 return THREAD_FACTORY_SSL;
104 return THREAD_FACTORY_NON_SSL;
106 return Executors.defaultThreadFactory();
108 } else if (type == ConnectionType.ACTIVE) {
109 ThreadFactory threadFactorySSL =
110 new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConn-" + executorNameArgs + "-%d")
112 return threadFactorySSL;
115 return Executors.defaultThreadFactory();
121 void setupUpdateListener() {
122 if (rpcCallback == null) {
123 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
125 public void update(final Object node, final UpdateNotification updateNotification) {
126 String key = updateNotification.getContext();
127 CallbackContext callbackContext = monitorCallbacks.get(key);
128 MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
129 if (monitorCallBack == null) {
131 LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
134 TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
135 callbackContext.schema);
136 monitorCallBack.update(updates, callbackContext.schema);
140 public void locked(final Object node, final List<String> ids) {
145 public void stolen(final Object node, final List<String> ids) {
149 this.rpcCallback = temp;
150 rpc.registerCallback(temp);
155 protected TableUpdates transformingCallback(final JsonNode tableUpdatesJson, final DatabaseSchema dbSchema) {
156 //todo(ashwin): we should move all the JSON parsing logic to a utility class
157 if (tableUpdatesJson instanceof ObjectNode) {
158 Map<String, TableUpdate> tableUpdateMap = new HashMap<>();
159 ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
160 for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
161 Map.Entry<String, JsonNode> entry = itr.next();
163 DatabaseSchema databaseSchema = this.schemas.get(dbSchema.getName());
164 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
165 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
168 return new TableUpdates(tableUpdateMap);
174 public ListenableFuture<List<OperationResult>> transact(final DatabaseSchema dbSchema,
175 final List<Operation> operations) {
177 //todo, we may not need transactionbuilder if we can have JSON objects
178 TransactBuilder builder = new TransactBuilder(dbSchema);
179 for (Operation operation : operations) {
180 builder.addOperation(operation);
183 return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
187 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
188 final List<MonitorRequest> monitorRequest,
189 final MonitorCallBack callback) {
190 return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
194 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
195 final List<MonitorRequest> monitorRequest,
196 final MonitorCallBack callback,
199 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
200 MonitorRequest::getTableName);
202 final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
203 registerCallback(monitorHandle, callback, dbSchema);
205 ListenableFuture<JsonNode> monitor = rpc.monitor(
206 () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
209 if (timeout == NO_TIMEOUT) {
210 result = monitor.get();
212 result = monitor.get(timeout, TimeUnit.SECONDS);
214 } catch (InterruptedException | ExecutionException | TimeoutException e) {
215 LOG.warn("Failed to monitor {}", dbSchema, e);
218 return transformingCallback(result, dbSchema);
222 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
223 final List<MonitorRequest> monitorRequest,
224 final MonitorHandle monitorHandle,
225 final MonitorCallBack callback) {
226 return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
230 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
231 final List<MonitorRequest> monitorRequest,
232 final MonitorHandle monitorHandle,
233 final MonitorCallBack callback,
236 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
237 MonitorRequest::getTableName);
239 registerCallback(monitorHandle, callback, dbSchema);
241 ListenableFuture<JsonNode> monitor = rpc.monitor(
242 () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
245 if (timeout == NO_TIMEOUT) {
246 result = monitor.get();
248 result = monitor.get(timeout, TimeUnit.SECONDS);
250 } catch (InterruptedException | ExecutionException | TimeoutException e) {
251 LOG.warn("Failed to monitor {}", dbSchema, e);
254 return transformingCallback(result, dbSchema);
257 private void registerCallback(final MonitorHandle monitorHandle, final MonitorCallBack callback,
258 final DatabaseSchema schema) {
259 this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
260 setupUpdateListener();
264 public void cancelMonitor(final MonitorHandle handler) {
265 cancelMonitor(handler, NO_TIMEOUT);
269 public void cancelMonitor(final MonitorHandle handler, final int timeout) {
270 ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
272 JsonNode result = null;
274 if (timeout == NO_TIMEOUT) {
275 result = cancelMonitor.get();
277 result = cancelMonitor.get(timeout, TimeUnit.SECONDS);
279 } catch (InterruptedException | ExecutionException | TimeoutException e) {
280 LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
283 if (result == null) {
284 LOG.error("Fail to cancel monitor with handler {}", handler.getId());
286 LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
291 public ListenableFuture<List<String>> echo() {
296 public void lock(final String lockId, final LockAquisitionCallback lockedCallBack,
297 final LockStolenCallback stolenCallback) {
298 throw new UnsupportedOperationException("not yet implemented");
302 public ListenableFuture<Boolean> steal(final String lockId) {
303 throw new UnsupportedOperationException("not yet implemented");
307 public ListenableFuture<Boolean> unLock(final String lockId) {
308 throw new UnsupportedOperationException("not yet implemented");
312 public void startEchoService(final EchoServiceCallbackFilters callbackFilters) {
313 throw new UnsupportedOperationException("not yet implemented");
317 public void stopEchoService() {
318 throw new UnsupportedOperationException("not yet implemented");
322 public TransactionBuilder transactBuilder(final DatabaseSchema dbSchema) {
323 return new TransactionBuilder(this, dbSchema);
327 public boolean isReady(int timeout) throws InterruptedException {
328 while (timeout > 0) {
329 if (!schemas.isEmpty()) {
339 public ListenableFuture<List<String>> getDatabases() {
340 return rpc.list_dbs();
344 public ListenableFuture<DatabaseSchema> getSchema(final String database) {
345 final DatabaseSchema existing = schemas.get(database);
346 if (existing != null) {
347 return Futures.immediateFuture(existing);
350 return Futures.transform(getSchemaFromDevice(Collections.singletonList(database)), result -> {
351 DatabaseSchema dbSchema = result.get(database);
352 if (dbSchema == null) {
356 dbSchema = dbSchema.withInternallyGeneratedColumns();
357 final DatabaseSchema raced = schemas.putIfAbsent(database, dbSchema);
358 return raced != null ? raced : dbSchema;
362 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
363 Map<String, DatabaseSchema> schema = new HashMap<>();
364 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
365 populateSchema(dbNames, schema, future);
369 private void populateSchema(final List<String> dbNames,
370 final Map<String, DatabaseSchema> schema,
371 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
373 if (dbNames == null || dbNames.isEmpty()) {
377 Futures.transform(rpc.get_schema(Collections.singletonList(dbNames.get(0))), jsonNode -> {
379 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
380 if (schema.size() > 1 && !sfuture.isCancelled()) {
381 populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
382 } else if (schema.size() == 1) {
385 } catch (ParsingException e) {
386 LOG.warn("Failed to populate schema {}:{}", dbNames, schema, e);
387 sfuture.setException(e);
390 }, MoreExecutors.directExecutor());
393 public void setRpc(final OvsdbRPC rpc) {
397 static class CallbackContext {
398 MonitorCallBack monitorCallBack;
399 DatabaseSchema schema;
401 CallbackContext(final MonitorCallBack monitorCallBack, final DatabaseSchema schema) {
402 this.monitorCallBack = monitorCallBack;
403 this.schema = schema;
408 public DatabaseSchema getDatabaseSchema(final String dbName) {
409 return schemas.get(dbName);
413 * This method finds the DatabaseSchema that matches a given Typed Table Class.
414 * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
415 * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
417 * @param klazz Typed Class that represents a Table
418 * @return DatabaseSchema that matches a Typed Table Class
420 private <T> DatabaseSchema getDatabaseSchemaForTypedTable(final Class<T> klazz) {
421 TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
422 if (typedTable != null) {
423 return this.getDatabaseSchema(typedTable.database());
429 * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
430 * given the Typed Table Class.
432 * @param klazz Typed Interface
433 * @return Proxy wrapper for the actual raw Row class.
436 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final Class<T> klazz) {
437 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
438 return this.createTypedRowWrapper(dbSchema, klazz);
442 * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
443 * DatabaseSchema and Typed Table Class.
445 * @param dbSchema Database Schema of interest
446 * @param klazz Typed Interface
447 * @return Proxy wrapper for the actual raw Row class.
450 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final DatabaseSchema dbSchema, final Class<T> klazz) {
451 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<>());
455 * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
457 * @param klazz Typed Interface
458 * @param row The actual Row that the wrapper is operating on.
459 * It can be null if the caller is just interested in getting ColumnSchema.
460 * @return Proxy wrapper for the actual raw Row class.
464 public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
465 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
466 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
470 public OvsdbConnectionInfo getConnectionInfo() {
471 return connectionInfo;
475 public boolean isActive() {
476 return channel.isActive();
480 public void disconnect() {
481 channel.disconnect();
482 executorService.shutdown();
486 public boolean isConnectionPublished() {
487 return isConnectionPublished;
491 public void setConnectionPublished(final boolean connectionPublished) {
492 isConnectionPublished = connectionPublished;