* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.ovsdb.lib.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class OvsdbClientImpl implements OvsdbClient {
private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
private ExecutorService executorService;
private OvsdbRPC rpc;
- private Map<String, DatabaseSchema> schemas = new HashMap<>();
- private Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
+ private final Map<String, DatabaseSchema> schemas = new HashMap<>();
+ private final Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
private OvsdbRPC.Callback rpcCallback;
private OvsdbConnectionInfo connectionInfo;
private Channel channel;
private static final ThreadFactory THREAD_FACTORY_NON_SSL =
new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
- public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type,
- SocketConnectionType socketConnType) {
+ public OvsdbClientImpl(final OvsdbRPC rpc, final Channel channel, final ConnectionType type,
+ final SocketConnectionType socketConnType) {
this.rpc = rpc;
ThreadFactory threadFactory =
getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
* @param executorNameArgs Additional args to append to thread name format
* @return {@link ThreadFactory}
*/
- private ThreadFactory getThreadFactory(ConnectionType type,
- SocketConnectionType socketConnType, String executorNameArgs) {
+ private ThreadFactory getThreadFactory(final ConnectionType type,
+ final SocketConnectionType socketConnType, final String executorNameArgs) {
if (type == ConnectionType.PASSIVE) {
switch (socketConnType) {
case SSL:
if (rpcCallback == null) {
OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
@Override
- public void update(Object node, UpdateNotification updateNotification) {
+ public void update(final Object node, final UpdateNotification updateNotification) {
String key = updateNotification.getContext();
CallbackContext callbackContext = monitorCallbacks.get(key);
MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
}
@Override
- public void locked(Object node, List<String> ids) {
+ public void locked(final Object node, final List<String> ids) {
}
@Override
- public void stolen(Object node, List<String> ids) {
+ public void stolen(final Object node, final List<String> ids) {
}
};
}
- protected TableUpdates transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
+ protected TableUpdates transformingCallback(final JsonNode tableUpdatesJson, final DatabaseSchema dbSchema) {
//todo(ashwin): we should move all the JSON parsing logic to a utility class
if (tableUpdatesJson instanceof ObjectNode) {
Map<String, TableUpdate> tableUpdateMap = new HashMap<>();
}
@Override
- public ListenableFuture<List<OperationResult>> transact(DatabaseSchema dbSchema, List<Operation> operations) {
+ public ListenableFuture<List<OperationResult>> transact(final DatabaseSchema dbSchema,
+ final List<Operation> operations) {
//todo, we may not need transactionbuilder if we can have JSON objects
TransactBuilder builder = new TransactBuilder(dbSchema);
@Override
public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
- List<MonitorRequest> monitorRequest,
+ final List<MonitorRequest> monitorRequest,
final MonitorCallBack callback) {
return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
}
@Override
public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
- List<MonitorRequest> monitorRequest,
+ final List<MonitorRequest> monitorRequest,
final MonitorCallBack callback,
- int timeout) {
+ final int timeout) {
final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
MonitorRequest::getTableName);
@Override
public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
- List<MonitorRequest> monitorRequest,
+ final List<MonitorRequest> monitorRequest,
final MonitorHandle monitorHandle,
final MonitorCallBack callback) {
return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
@Override
public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
- List<MonitorRequest> monitorRequest,
+ final List<MonitorRequest> monitorRequest,
final MonitorHandle monitorHandle,
final MonitorCallBack callback,
- int timeout) {
+ final int timeout) {
final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
MonitorRequest::getTableName);
return transformingCallback(result, dbSchema);
}
- private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
+ private void registerCallback(final MonitorHandle monitorHandle, final MonitorCallBack callback,
+ final DatabaseSchema schema) {
this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
setupUpdateListener();
}
}
@Override
- public void cancelMonitor(final MonitorHandle handler, int timeout) {
+ public void cancelMonitor(final MonitorHandle handler, final int timeout) {
ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
JsonNode result = null;
}
@Override
- public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
+ public void lock(final String lockId, final LockAquisitionCallback lockedCallBack,
+ final LockStolenCallback stolenCallback) {
throw new UnsupportedOperationException("not yet implemented");
}
@Override
- public ListenableFuture<Boolean> steal(String lockId) {
+ public ListenableFuture<Boolean> steal(final String lockId) {
throw new UnsupportedOperationException("not yet implemented");
}
@Override
- public ListenableFuture<Boolean> unLock(String lockId) {
+ public ListenableFuture<Boolean> unLock(final String lockId) {
throw new UnsupportedOperationException("not yet implemented");
}
@Override
- public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
+ public void startEchoService(final EchoServiceCallbackFilters callbackFilters) {
throw new UnsupportedOperationException("not yet implemented");
}
}
@Override
- public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) {
+ public TransactionBuilder transactBuilder(final DatabaseSchema dbSchema) {
return new TransactionBuilder(this, dbSchema);
}
@Override
public ListenableFuture<DatabaseSchema> getSchema(final String database) {
-
- DatabaseSchema databaseSchema = schemas.get(database);
-
- if (databaseSchema == null) {
- return Futures.transform(
- getSchemaFromDevice(Collections.singletonList(database)),
- (Function<Map<String, DatabaseSchema>, DatabaseSchema>) result -> {
- if (result.containsKey(database)) {
- DatabaseSchema dbSchema = result.get(database);
- dbSchema.populateInternallyGeneratedColumns();
- OvsdbClientImpl.this.schemas.put(database, dbSchema);
- return dbSchema;
- } else {
- return null;
- }
- }, executorService);
- } else {
- return Futures.immediateFuture(databaseSchema);
+ final DatabaseSchema existing = schemas.get(database);
+ if (existing != null) {
+ return Futures.immediateFuture(existing);
}
+
+ return Futures.transform(getSchemaFromDevice(Collections.singletonList(database)), result -> {
+ final DatabaseSchema dbSchema = result.get(database);
+ if (dbSchema == null) {
+ return null;
+ }
+
+ dbSchema.populateInternallyGeneratedColumns();
+ final DatabaseSchema raced = schemas.putIfAbsent(database, dbSchema);
+ return raced != null ? raced : dbSchema;
+ }, executorService);
}
private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
}, MoreExecutors.directExecutor());
}
- public void setRpc(OvsdbRPC rpc) {
+ public void setRpc(final OvsdbRPC rpc) {
this.rpc = rpc;
}
MonitorCallBack monitorCallBack;
DatabaseSchema schema;
- CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
+ CallbackContext(final MonitorCallBack monitorCallBack, final DatabaseSchema schema) {
this.monitorCallBack = monitorCallBack;
this.schema = schema;
}
}
@Override
- public DatabaseSchema getDatabaseSchema(String dbName) {
+ public DatabaseSchema getDatabaseSchema(final String dbName) {
return schemas.get(dbName);
}
* @param klazz Typed Class that represents a Table
* @return DatabaseSchema that matches a Typed Table Class
*/
- private <T> DatabaseSchema getDatabaseSchemaForTypedTable(Class<T> klazz) {
+ private <T> DatabaseSchema getDatabaseSchemaForTypedTable(final Class<T> klazz) {
TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
if (typedTable != null) {
return this.getDatabaseSchema(typedTable.database());
* @return Proxy wrapper for the actual raw Row class.
*/
@Override
- public <T extends TypedBaseTable<?>> T createTypedRowWrapper(Class<T> klazz) {
+ public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final Class<T> klazz) {
DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
return this.createTypedRowWrapper(dbSchema, klazz);
}
* @return Proxy wrapper for the actual raw Row class.
*/
@Override
- public <T extends TypedBaseTable<?>> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
+ public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final DatabaseSchema dbSchema, final Class<T> klazz) {
return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<>());
}
}
@Override
- public void setConnectionPublished(boolean connectionPublished) {
+ public void setConnectionPublished(final boolean connectionPublished) {
isConnectionPublished = connectionPublished;
}
}