Improve schemas population 28/86128/2
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 28 Nov 2019 20:52:24 +0000 (21:52 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 3 Dec 2019 10:19:29 +0000 (11:19 +0100)
Fetching the schema is an asynchronous process, which means we could
end up with competing requests overwriting the schema. Make sure to
re-check presence when the resolution process completes.

Change-Id: I0a7076b7a476394d4815f3c8f544be7a9ffaddd8
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 0faae677b33efe1dd5bec783f8e57c5c0a10cdc4)

library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java

index 7eaaa6ba8548d6adc8324728d94db51a8c30d0bf..72afec33459c88cecb16158d1242f00ed6a34f5f 100644 (file)
@@ -5,12 +5,10 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.lib.impl;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
@@ -61,14 +59,13 @@ import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class OvsdbClientImpl implements OvsdbClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
     private ExecutorService executorService;
     private OvsdbRPC rpc;
-    private Map<String, DatabaseSchema> schemas = new HashMap<>();
-    private Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
+    private final Map<String, DatabaseSchema> schemas = new HashMap<>();
+    private final Map<String, CallbackContext> monitorCallbacks = new HashMap<>();
     private OvsdbRPC.Callback rpcCallback;
     private OvsdbConnectionInfo connectionInfo;
     private Channel channel;
@@ -80,8 +77,8 @@ public class OvsdbClientImpl implements OvsdbClient {
     private static final ThreadFactory THREAD_FACTORY_NON_SSL =
         new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
 
-    public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type,
-        SocketConnectionType socketConnType) {
+    public OvsdbClientImpl(final OvsdbRPC rpc, final Channel channel, final ConnectionType type,
+        final SocketConnectionType socketConnType) {
         this.rpc = rpc;
         ThreadFactory threadFactory =
             getThreadFactory(type, socketConnType, channel.remoteAddress().toString());
@@ -97,8 +94,8 @@ public class OvsdbClientImpl implements OvsdbClient {
      * @param executorNameArgs Additional args to append to thread name format
      * @return {@link ThreadFactory}
      */
-    private ThreadFactory getThreadFactory(ConnectionType type,
-        SocketConnectionType socketConnType, String executorNameArgs) {
+    private ThreadFactory getThreadFactory(final ConnectionType type,
+        final SocketConnectionType socketConnType, final String executorNameArgs) {
         if (type == ConnectionType.PASSIVE) {
             switch (socketConnType) {
                 case SSL:
@@ -125,7 +122,7 @@ public class OvsdbClientImpl implements OvsdbClient {
         if (rpcCallback == null) {
             OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
                 @Override
-                public void update(Object node, UpdateNotification updateNotification) {
+                public void update(final Object node, final UpdateNotification updateNotification) {
                     String key = updateNotification.getContext();
                     CallbackContext callbackContext = monitorCallbacks.get(key);
                     MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
@@ -140,12 +137,12 @@ public class OvsdbClientImpl implements OvsdbClient {
                 }
 
                 @Override
-                public void locked(Object node, List<String> ids) {
+                public void locked(final Object node, final List<String> ids) {
 
                 }
 
                 @Override
-                public void stolen(Object node, List<String> ids) {
+                public void stolen(final Object node, final List<String> ids) {
 
                 }
             };
@@ -155,7 +152,7 @@ public class OvsdbClientImpl implements OvsdbClient {
     }
 
 
-    protected TableUpdates transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
+    protected TableUpdates transformingCallback(final JsonNode tableUpdatesJson, final DatabaseSchema dbSchema) {
         //todo(ashwin): we should move all the JSON parsing logic to a utility class
         if (tableUpdatesJson instanceof ObjectNode) {
             Map<String, TableUpdate> tableUpdateMap = new HashMap<>();
@@ -174,7 +171,8 @@ public class OvsdbClientImpl implements OvsdbClient {
     }
 
     @Override
-    public ListenableFuture<List<OperationResult>> transact(DatabaseSchema dbSchema, List<Operation> operations) {
+    public ListenableFuture<List<OperationResult>> transact(final DatabaseSchema dbSchema,
+            final List<Operation> operations) {
 
         //todo, we may not need transactionbuilder if we can have JSON objects
         TransactBuilder builder = new TransactBuilder(dbSchema);
@@ -187,16 +185,16 @@ public class OvsdbClientImpl implements OvsdbClient {
 
     @Override
     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
-                                                           List<MonitorRequest> monitorRequest,
+                                                           final List<MonitorRequest> monitorRequest,
                                                            final MonitorCallBack callback) {
         return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
     }
 
     @Override
     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
-                                                            List<MonitorRequest> monitorRequest,
+                                                            final List<MonitorRequest> monitorRequest,
                                                             final MonitorCallBack callback,
-                                                            int timeout) {
+                                                            final int timeout) {
 
         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
                 MonitorRequest::getTableName);
@@ -222,7 +220,7 @@ public class OvsdbClientImpl implements OvsdbClient {
 
     @Override
     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
-                                                           List<MonitorRequest> monitorRequest,
+                                                           final List<MonitorRequest> monitorRequest,
                                                            final MonitorHandle monitorHandle,
                                                            final MonitorCallBack callback) {
         return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
@@ -230,10 +228,10 @@ public class OvsdbClientImpl implements OvsdbClient {
 
     @Override
     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
-                                                           List<MonitorRequest> monitorRequest,
+                                                           final List<MonitorRequest> monitorRequest,
                                                            final MonitorHandle monitorHandle,
                                                            final MonitorCallBack callback,
-                                                           int timeout) {
+                                                           final int timeout) {
 
         final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
                 MonitorRequest::getTableName);
@@ -256,7 +254,8 @@ public class OvsdbClientImpl implements OvsdbClient {
         return transformingCallback(result, dbSchema);
     }
 
-    private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
+    private void registerCallback(final MonitorHandle monitorHandle, final MonitorCallBack callback,
+            final DatabaseSchema schema) {
         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
         setupUpdateListener();
     }
@@ -267,7 +266,7 @@ public class OvsdbClientImpl implements OvsdbClient {
     }
 
     @Override
-    public void cancelMonitor(final MonitorHandle handler, int timeout) {
+    public void cancelMonitor(final MonitorHandle handler, final int timeout) {
         ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
 
         JsonNode result = null;
@@ -294,22 +293,23 @@ public class OvsdbClientImpl implements OvsdbClient {
     }
 
     @Override
-    public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
+    public void lock(final String lockId, final LockAquisitionCallback lockedCallBack,
+            final LockStolenCallback stolenCallback) {
         throw new UnsupportedOperationException("not yet implemented");
     }
 
     @Override
-    public ListenableFuture<Boolean> steal(String lockId) {
+    public ListenableFuture<Boolean> steal(final String lockId) {
         throw new UnsupportedOperationException("not yet implemented");
     }
 
     @Override
-    public ListenableFuture<Boolean> unLock(String lockId) {
+    public ListenableFuture<Boolean> unLock(final String lockId) {
         throw new UnsupportedOperationException("not yet implemented");
     }
 
     @Override
-    public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
+    public void startEchoService(final EchoServiceCallbackFilters callbackFilters) {
         throw new UnsupportedOperationException("not yet implemented");
     }
 
@@ -319,7 +319,7 @@ public class OvsdbClientImpl implements OvsdbClient {
     }
 
     @Override
-    public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) {
+    public TransactionBuilder transactBuilder(final DatabaseSchema dbSchema) {
         return new TransactionBuilder(this, dbSchema);
     }
 
@@ -342,25 +342,21 @@ public class OvsdbClientImpl implements OvsdbClient {
 
     @Override
     public ListenableFuture<DatabaseSchema> getSchema(final String database) {
-
-        DatabaseSchema databaseSchema = schemas.get(database);
-
-        if (databaseSchema == null) {
-            return Futures.transform(
-                getSchemaFromDevice(Collections.singletonList(database)),
-                (Function<Map<String, DatabaseSchema>, DatabaseSchema>) result -> {
-                    if (result.containsKey(database)) {
-                        DatabaseSchema dbSchema = result.get(database);
-                        dbSchema.populateInternallyGeneratedColumns();
-                        OvsdbClientImpl.this.schemas.put(database, dbSchema);
-                        return dbSchema;
-                    } else {
-                        return null;
-                    }
-                }, executorService);
-        } else {
-            return Futures.immediateFuture(databaseSchema);
+        final DatabaseSchema existing = schemas.get(database);
+        if (existing != null) {
+            return Futures.immediateFuture(existing);
         }
+
+        return Futures.transform(getSchemaFromDevice(Collections.singletonList(database)), result -> {
+            final DatabaseSchema dbSchema = result.get(database);
+            if (dbSchema == null) {
+                return null;
+            }
+
+            dbSchema.populateInternallyGeneratedColumns();
+            final DatabaseSchema raced = schemas.putIfAbsent(database, dbSchema);
+            return raced != null ? raced : dbSchema;
+        }, executorService);
     }
 
     private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
@@ -394,7 +390,7 @@ public class OvsdbClientImpl implements OvsdbClient {
         }, MoreExecutors.directExecutor());
     }
 
-    public void setRpc(OvsdbRPC rpc) {
+    public void setRpc(final OvsdbRPC rpc) {
         this.rpc = rpc;
     }
 
@@ -402,14 +398,14 @@ public class OvsdbClientImpl implements OvsdbClient {
         MonitorCallBack monitorCallBack;
         DatabaseSchema schema;
 
-        CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
+        CallbackContext(final MonitorCallBack monitorCallBack, final DatabaseSchema schema) {
             this.monitorCallBack = monitorCallBack;
             this.schema = schema;
         }
     }
 
     @Override
-    public DatabaseSchema getDatabaseSchema(String dbName) {
+    public DatabaseSchema getDatabaseSchema(final String dbName) {
         return schemas.get(dbName);
     }
 
@@ -421,7 +417,7 @@ public class OvsdbClientImpl implements OvsdbClient {
      * @param klazz Typed Class that represents a Table
      * @return DatabaseSchema that matches a Typed Table Class
      */
-    private <T> DatabaseSchema getDatabaseSchemaForTypedTable(Class<T> klazz) {
+    private <T> DatabaseSchema getDatabaseSchemaForTypedTable(final Class<T> klazz) {
         TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
         if (typedTable != null) {
             return this.getDatabaseSchema(typedTable.database());
@@ -437,7 +433,7 @@ public class OvsdbClientImpl implements OvsdbClient {
      * @return Proxy wrapper for the actual raw Row class.
      */
     @Override
-    public <T extends TypedBaseTable<?>> T createTypedRowWrapper(Class<T> klazz) {
+    public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final Class<T> klazz) {
         DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
         return this.createTypedRowWrapper(dbSchema, klazz);
     }
@@ -451,7 +447,7 @@ public class OvsdbClientImpl implements OvsdbClient {
      * @return Proxy wrapper for the actual raw Row class.
      */
     @Override
-    public <T extends TypedBaseTable<?>> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
+    public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final DatabaseSchema dbSchema, final Class<T> klazz) {
         return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<>());
     }
 
@@ -492,7 +488,7 @@ public class OvsdbClientImpl implements OvsdbClient {
     }
 
     @Override
-    public void setConnectionPublished(boolean connectionPublished) {
+    public void setConnectionPublished(final boolean connectionPublished) {
         isConnectionPublished = connectionPublished;
     }
 }