2 * Copyright © 2014, 2017 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.lib.impl;
11 import com.fasterxml.jackson.databind.JsonNode;
12 import com.fasterxml.jackson.databind.node.ObjectNode;
13 import com.google.common.base.Function;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Maps;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import io.netty.channel.Channel;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
28 import java.util.UUID;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
36 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
37 import org.opendaylight.ovsdb.lib.LockStolenCallback;
38 import org.opendaylight.ovsdb.lib.MonitorCallBack;
39 import org.opendaylight.ovsdb.lib.MonitorHandle;
40 import org.opendaylight.ovsdb.lib.OvsdbClient;
41 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
42 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
43 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
44 import org.opendaylight.ovsdb.lib.error.ParsingException;
45 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
46 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
47 import org.opendaylight.ovsdb.lib.message.TableUpdate;
48 import org.opendaylight.ovsdb.lib.message.TableUpdates;
49 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
50 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
51 import org.opendaylight.ovsdb.lib.notation.Row;
52 import org.opendaylight.ovsdb.lib.operations.Operation;
53 import org.opendaylight.ovsdb.lib.operations.OperationResult;
54 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
55 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
56 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
57 import org.opendaylight.ovsdb.lib.schema.TableSchema;
58 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
59 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
60 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
65 public class OvsdbClientImpl implements OvsdbClient {
67 private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
68 private ExecutorService executorService;
70 private Map<String, DatabaseSchema> schemas = new HashMap<>();
71 private Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
72 private OvsdbRPC.Callback rpcCallback;
73 private OvsdbConnectionInfo connectionInfo;
74 private Channel channel;
75 private boolean isConnectionPublished;
76 private static final int NO_TIMEOUT = -1;
78 private static final ThreadFactory THREAD_FACTORY_SSL =
79 new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
80 private static final ThreadFactory THREAD_FACTORY_NON_SSL =
81 new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
83 public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type,
84 SocketConnectionType socketConnType) {
86 ThreadFactory threadFactory =
87 getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
88 this.executorService = Executors.newCachedThreadPool(threadFactory);
89 this.channel = channel;
90 this.connectionInfo = new OvsdbConnectionInfo(channel, type);
94 * Generate the threadFactory based on ACTIVE, PASSIVE (SSL/NON-SSL) connection type.
95 * @param type ACTIVE or PASSIVE {@link ConnectionType}
96 * @param socketConnType SSL or NON-SSL {@link SocketConnectionType}
97 * @param executorNameArgs Additional args to append to thread name format
98 * @return {@link ThreadFactory}
100 private ThreadFactory getThreadFactory(ConnectionType type,
101 SocketConnectionType socketConnType, String executorNameArgs) {
102 if (type == ConnectionType.PASSIVE) {
103 switch (socketConnType) {
105 return THREAD_FACTORY_SSL;
107 return THREAD_FACTORY_NON_SSL;
109 return Executors.defaultThreadFactory();
111 } else if (type == ConnectionType.ACTIVE) {
112 ThreadFactory threadFactorySSL =
113 new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConn-" + executorNameArgs + "-%d")
115 return threadFactorySSL;
118 return Executors.defaultThreadFactory();
124 void setupUpdateListener() {
125 if (rpcCallback == null) {
126 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
128 public void update(Object node, UpdateNotification updateNotification) {
129 String key = updateNotification.getContext();
130 CallbackContext callbackContext = monitorCallbacks.get(key);
131 MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
132 if (monitorCallBack == null) {
134 LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
137 TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
138 callbackContext.schema);
139 monitorCallBack.update(updates, callbackContext.schema);
143 public void locked(Object node, List<String> ids) {
148 public void stolen(Object node, List<String> ids) {
152 this.rpcCallback = temp;
153 rpc.registerCallback(temp);
158 protected TableUpdates transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
159 //todo(ashwin): we should move all the JSON parsing logic to a utility class
160 if (tableUpdatesJson instanceof ObjectNode) {
161 Map<String, TableUpdate> tableUpdateMap = new HashMap<>();
162 ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
163 for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
164 Map.Entry<String, JsonNode> entry = itr.next();
166 DatabaseSchema databaseSchema = this.schemas.get(dbSchema.getName());
167 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
168 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
171 return new TableUpdates(tableUpdateMap);
177 public ListenableFuture<List<OperationResult>> transact(DatabaseSchema dbSchema, List<Operation> operations) {
179 //todo, we may not need transactionbuilder if we can have JSON objects
180 TransactBuilder builder = new TransactBuilder(dbSchema);
181 for (Operation operation : operations) {
182 builder.addOperation(operation);
185 return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
189 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
190 List<MonitorRequest> monitorRequest,
191 final MonitorCallBack callback) {
192 return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
196 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
197 List<MonitorRequest> monitorRequest,
198 final MonitorCallBack callback,
201 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
202 MonitorRequest::getTableName);
204 final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
205 registerCallback(monitorHandle, callback, dbSchema);
207 ListenableFuture<JsonNode> monitor = rpc.monitor(
208 () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
211 if (timeout == NO_TIMEOUT) {
212 result = monitor.get();
214 result = monitor.get(timeout, TimeUnit.SECONDS);
216 } catch (InterruptedException | ExecutionException | TimeoutException e) {
217 LOG.warn("Failed to monitor {}", dbSchema, e);
220 return transformingCallback(result, dbSchema);
224 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
225 List<MonitorRequest> monitorRequest,
226 final MonitorHandle monitorHandle,
227 final MonitorCallBack callback) {
228 return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
232 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
233 List<MonitorRequest> monitorRequest,
234 final MonitorHandle monitorHandle,
235 final MonitorCallBack callback,
238 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
239 MonitorRequest::getTableName);
241 registerCallback(monitorHandle, callback, dbSchema);
243 ListenableFuture<JsonNode> monitor = rpc.monitor(
244 () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
247 if (timeout == NO_TIMEOUT) {
248 result = monitor.get();
250 result = monitor.get(timeout, TimeUnit.SECONDS);
252 } catch (InterruptedException | ExecutionException | TimeoutException e) {
253 LOG.warn("Failed to monitor {}", dbSchema, e);
256 return transformingCallback(result, dbSchema);
259 private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
260 this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
261 setupUpdateListener();
265 public void cancelMonitor(final MonitorHandle handler) {
266 cancelMonitor(handler, NO_TIMEOUT);
270 public void cancelMonitor(final MonitorHandle handler, int timeout) {
271 ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
273 JsonNode result = null;
275 if (timeout == NO_TIMEOUT) {
276 result = cancelMonitor.get();
278 result = cancelMonitor.get(timeout, TimeUnit.SECONDS);
280 } catch (InterruptedException | ExecutionException | TimeoutException e) {
281 LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
284 if (result == null) {
285 LOG.error("Fail to cancel monitor with handler {}", handler.getId());
287 LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
292 public ListenableFuture<List<String>> echo() {
297 public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
298 throw new UnsupportedOperationException("not yet implemented");
302 public ListenableFuture<Boolean> steal(String lockId) {
303 throw new UnsupportedOperationException("not yet implemented");
307 public ListenableFuture<Boolean> unLock(String lockId) {
308 throw new UnsupportedOperationException("not yet implemented");
312 public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
313 throw new UnsupportedOperationException("not yet implemented");
317 public void stopEchoService() {
318 throw new UnsupportedOperationException("not yet implemented");
322 public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) {
323 return new TransactionBuilder(this, dbSchema);
327 public boolean isReady(int timeout) throws InterruptedException {
328 while (timeout > 0) {
329 if (!schemas.isEmpty()) {
339 public ListenableFuture<List<String>> getDatabases() {
340 return rpc.list_dbs();
344 public ListenableFuture<DatabaseSchema> getSchema(final String database) {
346 DatabaseSchema databaseSchema = schemas.get(database);
348 if (databaseSchema == null) {
349 return Futures.transform(
350 getSchemaFromDevice(Collections.singletonList(database)),
351 (Function<Map<String, DatabaseSchema>, DatabaseSchema>) result -> {
352 if (result.containsKey(database)) {
353 DatabaseSchema dbSchema = result.get(database);
354 dbSchema.populateInternallyGeneratedColumns();
355 OvsdbClientImpl.this.schemas.put(database, dbSchema);
362 return Futures.immediateFuture(databaseSchema);
366 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
367 Map<String, DatabaseSchema> schema = new HashMap<>();
368 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
369 populateSchema(dbNames, schema, future);
373 private void populateSchema(final List<String> dbNames,
374 final Map<String, DatabaseSchema> schema,
375 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
377 if (dbNames == null || dbNames.isEmpty()) {
381 Futures.transform(rpc.get_schema(Collections.singletonList(dbNames.get(0))), jsonNode -> {
383 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
384 if (schema.size() > 1 && !sfuture.isCancelled()) {
385 populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
386 } else if (schema.size() == 1) {
389 } catch (ParsingException e) {
390 LOG.warn("Failed to populate schema {}:{}", dbNames, schema, e);
391 sfuture.setException(e);
394 }, MoreExecutors.directExecutor());
397 public void setRpc(OvsdbRPC rpc) {
401 static class CallbackContext {
402 MonitorCallBack monitorCallBack;
403 DatabaseSchema schema;
405 CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
406 this.monitorCallBack = monitorCallBack;
407 this.schema = schema;
412 public DatabaseSchema getDatabaseSchema(String dbName) {
413 return schemas.get(dbName);
417 * This method finds the DatabaseSchema that matches a given Typed Table Class.
418 * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
419 * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
421 * @param klazz Typed Class that represents a Table
422 * @return DatabaseSchema that matches a Typed Table Class
424 private <T> DatabaseSchema getDatabaseSchemaForTypedTable(Class<T> klazz) {
425 TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
426 if (typedTable != null) {
427 return this.getDatabaseSchema(typedTable.database());
433 * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
434 * given the Typed Table Class.
436 * @param klazz Typed Interface
437 * @return Proxy wrapper for the actual raw Row class.
440 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(Class<T> klazz) {
441 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
442 return this.createTypedRowWrapper(dbSchema, klazz);
446 * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
447 * DatabaseSchema and Typed Table Class.
449 * @param dbSchema Database Schema of interest
450 * @param klazz Typed Interface
451 * @return Proxy wrapper for the actual raw Row class.
454 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
455 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<>());
459 * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
461 * @param klazz Typed Interface
462 * @param row The actual Row that the wrapper is operating on.
463 * It can be null if the caller is just interested in getting ColumnSchema.
464 * @return Proxy wrapper for the actual raw Row class.
468 public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
469 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
470 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
474 public OvsdbConnectionInfo getConnectionInfo() {
475 return connectionInfo;
479 public boolean isActive() {
480 return channel.isActive();
484 public void disconnect() {
485 channel.disconnect();
486 executorService.shutdown();
490 public boolean isConnectionPublished() {
491 return isConnectionPublished;
495 public void setConnectionPublished(boolean connectionPublished) {
496 isConnectionPublished = connectionPublished;