Refactor DataStore readiness tracking
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 77a0e88393d4760cf251de4d1f4df7bcb904f97e..52f642c98c4991f66e2cf2ba4a3de60619c9d84f 100644 (file)
@@ -35,6 +35,8 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -44,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -115,7 +116,7 @@ import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -155,14 +156,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreContextFactory datastoreContextFactory;
 
-    private final CountDownLatch waitTillReadyCountdownLatch;
+    private final SettableFuture<Void> readinessFuture;
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
     @VisibleForTesting
     final ShardPeerAddressResolver peerAddressResolver;
 
-    private SchemaContext schemaContext;
+    private EffectiveModelContext schemaContext;
 
     private DatastoreSnapshot restoreFromSnapshot;
 
@@ -186,7 +187,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
+        this.readinessFuture = builder.getReadinessFuture();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -278,7 +279,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot();
+            onGetSnapshot((GetSnapshot) message);
         } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof ChangeShardMembersVotingStatus) {
@@ -356,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         configUpdateHandler.initListener(dataStore, datastoreType);
     }
 
-    private void onShutDown() {
+    void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
@@ -425,6 +426,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
                                           final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -445,7 +448,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -466,6 +469,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
             final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -486,7 +491,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -530,7 +535,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
 
-            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
+            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Boolean result) {
                     if (failure == null) {
@@ -558,7 +563,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         persistShardList();
     }
 
-    private void onGetSnapshot() {
+    private void onGetSnapshot(final GetSnapshot getSnapshot) {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
@@ -583,7 +588,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for (ShardInformation shardInfo: localShards.values()) {
-            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+            shardInfo.getActor().tell(getSnapshot, replyActor);
         }
     }
 
@@ -756,10 +761,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void checkReady() {
         if (isReadyWithLeaderId()) {
-            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-            waitTillReadyCountdownLatch.countDown();
+            LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
+            readinessFuture.set(null);
         }
     }
 
@@ -1136,7 +1139,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param message the message to send
      */
     private void updateSchemaContext(final Object message) {
-        schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+        schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext();
 
         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
 
@@ -1229,7 +1232,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1288,10 +1291,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId,
-                                        Map<String, String> peerAddresses,
-                                        DatastoreContext datastoreContext,
-                                        Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+    ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
+                                        final Map<String, String> peerAddresses,
+                                        final DatastoreContext datastoreContext,
+                                        final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
         return new ShardInformation(shardName, shardId, peerAddresses,
                 datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
                 peerAddressResolver);
@@ -1428,12 +1431,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
         LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
         sender.tell(new Status.Failure(new AlreadyExistsException(
             String.format("Local shard %s already exists", shardName))), getSelf());
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
                                 final RemotePrimaryShardFound response, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -1468,6 +1475,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
@@ -1517,7 +1526,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object addServerResponse) {
                 if (failure != null) {
@@ -1721,7 +1730,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
                     Timeout.apply(30, TimeUnit.SECONDS));
 
-            future.onComplete(new OnComplete<Object>() {
+            future.onComplete(new OnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Object response) {
                     if (failure != null) {
@@ -1770,7 +1779,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1816,7 +1825,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 shardReplicaOperationsInProgress.remove(shardName);