Refactor DataStore readiness tracking
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index aa3cf3face4cd348500eb7d506774f8fecf8ac30..52f642c98c4991f66e2cf2ba4a3de60619c9d84f 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import static akka.pattern.Patterns.ask;
+import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
 import akka.actor.Address;
@@ -34,7 +35,8 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -44,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -112,12 +113,10 @@ import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHan
 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -139,7 +138,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // Stores a mapping between a shard name and it's corresponding information
     // Shard names look like inventory, topology etc and are as specified in
     // configuration
-    private final Map<String, ShardInformation> localShards = new HashMap<>();
+    @VisibleForTesting
+    final Map<String, ShardInformation> localShards = new HashMap<>();
 
     // The type of a ShardManager reflects the type of the datastore itself
     // A data store could be of type config/operational
@@ -149,19 +149,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Configuration configuration;
 
-    private final String shardDispatcherPath;
+    @VisibleForTesting
+    final String shardDispatcherPath;
 
     private final ShardManagerInfo shardManagerMBean;
 
     private DatastoreContextFactory datastoreContextFactory;
 
-    private final CountDownLatch waitTillReadyCountdownLatch;
+    private final SettableFuture<Void> readinessFuture;
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-    private final ShardPeerAddressResolver peerAddressResolver;
+    @VisibleForTesting
+    final ShardPeerAddressResolver peerAddressResolver;
 
-    private SchemaContext schemaContext;
+    private EffectiveModelContext schemaContext;
 
     private DatastoreSnapshot restoreFromSnapshot;
 
@@ -176,7 +178,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private final String persistenceId;
     private final AbstractDataStore dataStore;
 
-    private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
     private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
     ShardManager(final AbstractShardManagerCreator<?> builder) {
@@ -186,7 +187,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
+        this.readinessFuture = builder.getReadinessFuture();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -216,11 +217,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.info("Stopping ShardManager {}", persistenceId());
 
         shardManagerMBean.unregisterMBean();
-
-        if (configListenerReg != null) {
-            configListenerReg.close();
-            configListenerReg = null;
-        }
     }
 
     @Override
@@ -283,7 +279,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot();
+            onGetSnapshot((GetSnapshot) message);
         } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof ChangeShardMembersVotingStatus) {
@@ -361,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         configUpdateHandler.initListener(dataStore, datastoreType);
     }
 
-    private void onShutDown() {
+    void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
@@ -430,6 +426,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
                                           final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -450,7 +448,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -471,6 +469,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
             final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -491,7 +491,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -535,7 +535,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
 
-            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
+            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Boolean result) {
                     if (failure == null) {
@@ -563,7 +563,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         persistShardList();
     }
 
-    private void onGetSnapshot() {
+    private void onGetSnapshot(final GetSnapshot getSnapshot) {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
@@ -588,7 +588,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for (ShardInformation shardInfo: localShards.values()) {
-            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+            shardInfo.getActor().tell(getSnapshot, replyActor);
         }
     }
 
@@ -761,10 +761,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void checkReady() {
         if (isReadyWithLeaderId()) {
-            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-            waitTillReadyCountdownLatch.countDown();
+            LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
+            readinessFuture.set(null);
         }
     }
 
@@ -1141,7 +1139,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param message the message to send
      */
     private void updateSchemaContext(final Object message) {
-        schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+        schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext();
 
         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
 
@@ -1234,7 +1232,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1287,18 +1285,27 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
 
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
-                    newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
-                        shardSnapshots.get(shardName)), peerAddressResolver));
+            localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
+                    newShardDatastoreContext(shardName), shardSnapshots));
         }
     }
 
+    @VisibleForTesting
+    ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
+                                        final Map<String, String> peerAddresses,
+                                        final DatastoreContext datastoreContext,
+                                        final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+        return new ShardInformation(shardName, shardId, peerAddresses,
+                datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+                peerAddressResolver);
+    }
+
     /**
      * Given the name of the shard find the addresses of all it's peers.
      *
      * @param shardName the shard name
      */
-    private Map<String, String> getPeerAddresses(final String shardName) {
+    Map<String, String> getPeerAddresses(final String shardName) {
         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
         return getPeerAddresses(shardName, members);
     }
@@ -1424,12 +1431,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
         LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
         sender.tell(new Status.Failure(new AlreadyExistsException(
             String.format("Local shard %s already exists", shardName))), getSelf());
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
                                 final RemotePrimaryShardFound response, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -1464,6 +1475,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
@@ -1513,7 +1526,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object addServerResponse) {
                 if (failure != null) {
@@ -1717,7 +1730,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
                     Timeout.apply(30, TimeUnit.SECONDS));
 
-            future.onComplete(new OnComplete<Object>() {
+            future.onComplete(new OnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Object response) {
                     if (failure != null) {
@@ -1766,7 +1779,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1812,7 +1825,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 shardReplicaOperationsInProgress.remove(shardName);
@@ -1960,10 +1973,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
          */
         protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
                 final String persistenceId, final ActorRef shardManagerActor) {
-            this.targetActor = Preconditions.checkNotNull(targetActor);
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.persistenceId = Preconditions.checkNotNull(persistenceId);
-            this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+            this.targetActor = requireNonNull(targetActor);
+            this.shardName = requireNonNull(shardName);
+            this.persistenceId = requireNonNull(persistenceId);
+            this.shardManagerActor = requireNonNull(shardManagerActor);
         }
 
         public ActorRef getTargetActor() {