Use VarHandles.fullFence() instead of an 'updated' field 49/109449/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 28 Dec 2023 21:10:15 +0000 (22:10 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 28 Dec 2023 21:17:22 +0000 (22:17 +0100)
Upgraded SpotBugs does not like an otherwise-unused field. Preempt
failures by using VarHandles.fullFence() instead.

Also modernize ActorUtils by using instanceof patterns and local
variable type inference.

Change-Id: Iec9b717eaf821657f3ced659e0c6e86f360b21c5
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java

index 1c7809e626d241df8d202f0953fe82eb4752357d..ba3f9c5c7e074d9f3897808cc83abe22fc6ad86a 100644 (file)
@@ -13,7 +13,6 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import akka.actor.Address;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
@@ -23,6 +22,7 @@ import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import java.lang.invoke.VarHandle;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
@@ -102,15 +102,13 @@ public class ActorUtils {
     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
         @Override
         public Throwable apply(final Throwable failure) {
-            Throwable actualFailure = failure;
             if (failure instanceof AskTimeoutException) {
                 // A timeout exception most likely means the shard isn't initialized.
-                actualFailure = new NotInitializedException(
+                return new NotInitializedException(
                         "Timed out trying to find the primary shard. Most likely cause is the "
                         + "shard is not initialized yet.");
             }
-
-            return actualFailure;
+            return failure;
         }
     };
     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
@@ -133,10 +131,6 @@ public class ActorUtils {
 
     private volatile EffectiveModelContext schemaContext;
 
-    // Used as a write memory barrier.
-    @SuppressWarnings("unused")
-    private volatile boolean updated;
-
     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
             .getMetricsRegistry();
 
@@ -163,7 +157,7 @@ public class ActorUtils {
 
         setCachedProperties();
 
-        Address selfAddress = clusterWrapper.getSelfAddress();
+        final var selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
         } else {
@@ -178,7 +172,7 @@ public class ActorUtils {
             TimeUnit.MILLISECONDS);
         operationTimeout = new Timeout(operationDuration);
 
-        transactionCommitOperationTimeout =  new Timeout(FiniteDuration.create(
+        transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
 
         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
@@ -216,14 +210,12 @@ public class ActorUtils {
         datastoreContext = contextFactory.getBaseDatastoreContext();
         setCachedProperties();
 
-        // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
-        // will be published immediately even though they may not be immediately visible to other
-        // threads due to unsynchronized reads. That's OK though - we're going for eventual
-        // consistency here as immediately visible updates to these members aren't critical. These
-        // members could've been made volatile but wanted to avoid volatile reads as these are
-        // accessed often and updates will be infrequent.
-
-        updated = true;
+        // Trigger a write memory barrier so that the writes above will be published immediately even though they may
+        // not be immediately visible to other threads due to unsynchronized reads. That is OK though - we are going for
+        // eventual consistency here as immediately visible updates to these members are not critical. These members
+        // could have been made volatile but wanted to avoid volatile reads as these are accessed often and updates will
+        // be infrequent.
+        VarHandle.fullFence();
 
         if (shardManager != null) {
             shardManager.tell(contextFactory, ActorRef.noSender());
@@ -235,43 +227,40 @@ public class ActorUtils {
     }
 
     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
-        Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
+        final var ret = primaryShardInfoCache.getIfPresent(shardName);
         if (ret != null) {
             return ret;
         }
-        Future<Object> future = executeOperationAsync(shardManager,
-                new FindPrimary(shardName, true), shardInitializationTimeout);
-
-        return future.transform(new Mapper<Object, PrimaryShardInfo>() {
-            @Override
-            public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
-                if (response instanceof RemotePrimaryShardFound) {
-                    LOG.debug("findPrimaryShardAsync received: {}", response);
-                    RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
-                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
-                } else if (response instanceof LocalPrimaryShardFound) {
-                    LOG.debug("findPrimaryShardAsync received: {}", response);
-                    LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
-                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
+
+        return executeOperationAsync(shardManager, new FindPrimary(shardName, true), shardInitializationTimeout)
+            .transform(new Mapper<>() {
+                @Override
+                public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
+                    if (response instanceof RemotePrimaryShardFound found) {
+                        LOG.debug("findPrimaryShardAsync received: {}", found);
+                        return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
+                    } else if (response instanceof LocalPrimaryShardFound found) {
+                        LOG.debug("findPrimaryShardAsync received: {}", found);
+                        return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
                             found.getLocalShardDataTree());
-                } else if (response instanceof NotInitializedException) {
-                    throw (NotInitializedException)response;
-                } else if (response instanceof PrimaryNotFoundException) {
-                    throw (PrimaryNotFoundException)response;
-                } else if (response instanceof NoShardLeaderException) {
-                    throw (NoShardLeaderException)response;
-                }
+                    } else if (response instanceof NotInitializedException notInitialized) {
+                        throw notInitialized;
+                    } else if (response instanceof PrimaryNotFoundException primaryNotFound) {
+                        throw primaryNotFound;
+                    } else if (response instanceof NoShardLeaderException noShardLeader) {
+                        throw noShardLeader;
+                    }
 
-                throw new UnknownMessageException(String.format(
+                    throw new UnknownMessageException(String.format(
                         "FindPrimary returned unkown response: %s", response));
-            }
-        }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
+                }
+            }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
             final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
-        ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
-        PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
+        final var actorSelection = actorSystem.actorSelection(primaryActorPath);
+        final var info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
         primaryShardInfoCache.putSuccessful(shardName, info);
         return info;
@@ -285,10 +274,8 @@ public class ActorUtils {
      *         specified by the shardName
      */
     public Optional<ActorRef> findLocalShard(final String shardName) {
-        Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
-
-        if (result instanceof LocalShardFound) {
-            LocalShardFound found = (LocalShardFound) result;
+        final var result = executeOperation(shardManager, new FindLocalShard(shardName, false));
+        if (result instanceof LocalShardFound found) {
             LOG.debug("Local shard found {}", found.getPath());
             return Optional.of(found.getPath());
         }
@@ -303,27 +290,23 @@ public class ActorUtils {
      * @param shardName the name of the local shard that needs to be found
      */
     public Future<ActorRef> findLocalShardAsync(final String shardName) {
-        Future<Object> future = executeOperationAsync(shardManager,
-                new FindLocalShard(shardName, true), shardInitializationTimeout);
-
-        return future.map(new Mapper<Object, ActorRef>() {
-            @Override
-            public ActorRef checkedApply(final Object response) throws Throwable {
-                if (response instanceof LocalShardFound) {
-                    LocalShardFound found = (LocalShardFound)response;
-                    LOG.debug("Local shard found {}", found.getPath());
-                    return found.getPath();
-                } else if (response instanceof NotInitializedException) {
-                    throw (NotInitializedException)response;
-                } else if (response instanceof LocalShardNotFound) {
-                    throw new LocalShardNotFoundException(
+        return executeOperationAsync(shardManager, new FindLocalShard(shardName, true), shardInitializationTimeout)
+            .map(new Mapper<>() {
+                @Override
+                public ActorRef checkedApply(final Object response) throws Throwable {
+                    if (response instanceof LocalShardFound found) {
+                        LOG.debug("Local shard found {}", found.getPath());
+                        return found.getPath();
+                    } else if (response instanceof NotInitializedException) {
+                        throw (NotInitializedException)response;
+                    } else if (response instanceof LocalShardNotFound) {
+                        throw new LocalShardNotFoundException(
                             String.format("Local shard for %s does not exist.", shardName));
-                }
+                    }
 
-                throw new UnknownMessageException(String.format(
-                        "FindLocalShard returned unkown response: %s", response));
-            }
-        }, getClientDispatcher());
+                    throw new UnknownMessageException("FindLocalShard returned unkown response: " + response);
+                }
+            }, getClientDispatcher());
     }
 
     /**
@@ -419,7 +402,7 @@ public class ActorUtils {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     public void shutdown() {
-        FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
+        final var duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
         try {
             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
         } catch (Exception e) {
@@ -441,15 +424,15 @@ public class ActorUtils {
     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
         for (final String shardName : configuration.getAllShardNames()) {
 
-            Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
-            primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
+            final var primaryFuture = findPrimaryShardAsync(shardName);
+            primaryFuture.onComplete(new OnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
                     if (failure != null) {
                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
                             shardName, failure);
                     } else {
-                        Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
+                        final var message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
                     }
                 }
@@ -483,7 +466,7 @@ public class ActorUtils {
                 return false;
             }
 
-            String hostPort = path.substring(pathAtIndex + 1, slashIndex);
+            final var hostPort = path.substring(pathAtIndex + 1, slashIndex);
             return hostPort.equals(selfAddressHostPort);
 
         } else {
@@ -504,8 +487,8 @@ public class ActorUtils {
     }
 
     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
-        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
-                operationName, METRIC_RATE);
+        final var rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName,
+            METRIC_RATE);
         return metricRegistry.timer(rate);
     }
 
@@ -578,7 +561,7 @@ public class ActorUtils {
     }
 
     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
-        final Future<Object> ret = ask(actorRef, message, timeout);
+        final var ret = ask(actorRef, message, timeout);
         ret.onComplete(askTimeoutCounter, askTimeoutCounter);
         return ret;
     }