Remove DOMDataTreeProducer-related classes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 43446b5ce2ce760d13c29a92e0a38c79acd74d9b..18bcadf1511c2e9bc247586d0d97d68c984f66f3 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import static akka.pattern.Patterns.ask;
+import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
 import akka.actor.Address;
@@ -34,7 +35,8 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -44,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -52,7 +53,6 @@ import java.util.function.Supplier;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
@@ -108,14 +108,10 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
-import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -155,14 +151,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreContextFactory datastoreContextFactory;
 
-    private final CountDownLatch waitTillReadyCountdownLatch;
+    private final SettableFuture<Void> readinessFuture;
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
     @VisibleForTesting
     final ShardPeerAddressResolver peerAddressResolver;
 
-    private SchemaContext schemaContext;
+    private EffectiveModelContext schemaContext;
 
     private DatastoreSnapshot restoreFromSnapshot;
 
@@ -175,9 +171,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
 
     private final String persistenceId;
-    private final AbstractDataStore dataStore;
-
-    private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
     ShardManager(final AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
@@ -186,7 +179,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
+        this.readinessFuture = builder.getReadinessFuture();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -202,8 +195,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "shard-manager-" + this.type,
                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
         shardManagerMBean.registerMBean();
-
-        dataStore = builder.getDistributedDataStore();
     }
 
     @Override
@@ -258,12 +249,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onAddShardReplica((AddShardReplica) message);
         } else if (message instanceof AddPrefixShardReplica) {
             onAddPrefixShardReplica((AddPrefixShardReplica) message);
-        } else if (message instanceof PrefixShardCreated) {
-            onPrefixShardCreated((PrefixShardCreated) message);
-        } else if (message instanceof PrefixShardRemoved) {
-            onPrefixShardRemoved((PrefixShardRemoved) message);
-        } else if (message instanceof InitConfigListener) {
-            onInitConfigListener();
         } else if (message instanceof ForwardedAddServerReply) {
             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
@@ -278,7 +263,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot();
+            onGetSnapshot((GetSnapshot) message);
         } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof ChangeShardMembersVotingStatus) {
@@ -341,22 +326,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
     }
 
-    private void onInitConfigListener() {
-        LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
-
-        final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
-                org.opendaylight.mdsal.common.api.LogicalDatastoreType
-                        .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
-
-        if (configUpdateHandler != null) {
-            configUpdateHandler.close();
-        }
-
-        configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
-        configUpdateHandler.initListener(dataStore, datastoreType);
-    }
-
-    private void onShutDown() {
+    void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
@@ -425,6 +395,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
                                           final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -445,7 +417,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -466,6 +438,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
             final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -486,7 +460,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -530,7 +504,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
 
-            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
+            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Boolean result) {
                     if (failure == null) {
@@ -558,7 +532,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         persistShardList();
     }
 
-    private void onGetSnapshot() {
+    private void onGetSnapshot(final GetSnapshot getSnapshot) {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
@@ -583,7 +557,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for (ShardInformation shardInfo: localShards.values()) {
-            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+            shardInfo.getActor().tell(getSnapshot, replyActor);
         }
     }
 
@@ -611,32 +585,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onPrefixShardCreated(final PrefixShardCreated message) {
-        LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
-
-        final PrefixShardConfiguration config = message.getConfiguration();
-        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
-                ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
-        final String shardName = shardId.getShardName();
-
-        if (isPreviousShardActorStopInProgress(shardName, message)) {
-            return;
-        }
-
-        if (localShards.containsKey(shardName)) {
-            LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
-            final PrefixShardConfiguration existing =
-                    configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
-
-            if (existing != null && existing.equals(config)) {
-                // we don't have to do nothing here
-                return;
-            }
-        }
-
-        doCreatePrefixShard(config, shardId, shardName);
-    }
-
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+        justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
         final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
         if (stopOnComplete == null) {
@@ -657,43 +607,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return true;
     }
 
-    private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId,
-            final String shardName) {
-        configuration.addPrefixShardConfiguration(config);
-
-        final Builder builder = newShardDatastoreContextBuilder(shardName);
-        builder.logicalStoreType(config.getPrefix().getDatastoreType())
-                .storeRoot(config.getPrefix().getRootIdentifier());
-        DatastoreContext shardDatastoreContext = builder.build();
-
-        final Map<String, String> peerAddresses = getPeerAddresses(shardName);
-        final boolean isActiveMember = true;
-
-        LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
-                persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
-
-        final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
-                shardDatastoreContext, Shard.builder(), peerAddressResolver);
-        info.setActiveMember(isActiveMember);
-        localShards.put(info.getShardName(), info);
-
-        if (schemaContext != null) {
-            info.setSchemaContext(schemaContext);
-            info.setActor(newShardActor(info));
-        }
-    }
-
-    private void onPrefixShardRemoved(final PrefixShardRemoved message) {
-        LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
-
-        final DOMDataTreeIdentifier prefix = message.getPrefix();
-        final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
-                ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
-
-        configuration.removePrefixShardConfiguration(prefix);
-        removeShard(shardId);
-    }
-
     private void doCreateShard(final CreateShard createShard) {
         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
         final String shardName = moduleShardConfig.getShardName();
@@ -756,10 +669,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void checkReady() {
         if (isReadyWithLeaderId()) {
-            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-            waitTillReadyCountdownLatch.countDown();
+            LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
+            readinessFuture.set(null);
         }
     }
 
@@ -1136,7 +1047,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param message the message to send
      */
     private void updateSchemaContext(final Object message) {
-        schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+        schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext();
 
         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
 
@@ -1229,7 +1140,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1288,10 +1199,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId,
-                                        Map<String, String> peerAddresses,
-                                        DatastoreContext datastoreContext,
-                                        Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+    ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
+                                        final Map<String, String> peerAddresses,
+                                        final DatastoreContext datastoreContext,
+                                        final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
         return new ShardInformation(shardName, shardId, peerAddresses,
                 datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
                 peerAddressResolver);
@@ -1428,12 +1339,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
         LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
         sender.tell(new Status.Failure(new AlreadyExistsException(
             String.format("Local shard %s already exists", shardName))), getSelf());
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
                                 final RemotePrimaryShardFound response, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -1468,6 +1383,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
@@ -1517,7 +1434,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object addServerResponse) {
                 if (failure != null) {
@@ -1721,7 +1638,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
                     Timeout.apply(30, TimeUnit.SECONDS));
 
-            future.onComplete(new OnComplete<Object>() {
+            future.onComplete(new OnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Object response) {
                     if (failure != null) {
@@ -1770,7 +1687,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1816,7 +1733,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 shardReplicaOperationsInProgress.remove(shardName);
@@ -1964,10 +1881,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
          */
         protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
                 final String persistenceId, final ActorRef shardManagerActor) {
-            this.targetActor = Preconditions.checkNotNull(targetActor);
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.persistenceId = Preconditions.checkNotNull(persistenceId);
-            this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+            this.targetActor = requireNonNull(targetActor);
+            this.shardName = requireNonNull(shardName);
+            this.persistenceId = requireNonNull(persistenceId);
+            this.shardManagerActor = requireNonNull(shardManagerActor);
         }
 
         public ActorRef getTargetActor() {