Refactor DataStore readiness tracking
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 2cc30a271cec446c3a05bbb546792d65f9511b66..52f642c98c4991f66e2cf2ba4a3de60619c9d84f 100644 (file)
@@ -35,6 +35,7 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.SettableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -116,7 +116,7 @@ import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -156,14 +156,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreContextFactory datastoreContextFactory;
 
-    private final CountDownLatch waitTillReadyCountdownLatch;
+    private final SettableFuture<Void> readinessFuture;
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
     @VisibleForTesting
     final ShardPeerAddressResolver peerAddressResolver;
 
-    private SchemaContext schemaContext;
+    private EffectiveModelContext schemaContext;
 
     private DatastoreSnapshot restoreFromSnapshot;
 
@@ -187,7 +187,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
+        this.readinessFuture = builder.getReadinessFuture();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -279,7 +279,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot();
+            onGetSnapshot((GetSnapshot) message);
         } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof ChangeShardMembersVotingStatus) {
@@ -448,7 +448,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -491,7 +491,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -535,7 +535,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
 
-            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
+            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Boolean result) {
                     if (failure == null) {
@@ -563,7 +563,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         persistShardList();
     }
 
-    private void onGetSnapshot() {
+    private void onGetSnapshot(final GetSnapshot getSnapshot) {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
@@ -588,7 +588,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for (ShardInformation shardInfo: localShards.values()) {
-            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+            shardInfo.getActor().tell(getSnapshot, replyActor);
         }
     }
 
@@ -761,10 +761,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void checkReady() {
         if (isReadyWithLeaderId()) {
-            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-            waitTillReadyCountdownLatch.countDown();
+            LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
+            readinessFuture.set(null);
         }
     }
 
@@ -1141,7 +1139,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param message the message to send
      */
     private void updateSchemaContext(final Object message) {
-        schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+        schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext();
 
         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
 
@@ -1234,7 +1232,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1528,7 +1526,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object addServerResponse) {
                 if (failure != null) {
@@ -1732,7 +1730,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
                     Timeout.apply(30, TimeUnit.SECONDS));
 
-            future.onComplete(new OnComplete<Object>() {
+            future.onComplete(new OnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Object response) {
                     if (failure != null) {
@@ -1781,7 +1779,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1827,7 +1825,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 shardReplicaOperationsInProgress.remove(shardName);