From def2aa2710cabf4d1867e8ce5dd847d380ef9393 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 5 May 2017 06:41:59 -0400 Subject: [PATCH] Bug 8385: Fix testMultipleRegistrationsAtOnePrefix failures The test quickly creates/removes the prefix shard in iterations which can result in an InvalidActorNameException if the shard actor from the prior iteration hadn't been destroyed yet. To alleviate this I modified the removal in the ShardManager to utilize Patterns.gracefulStop to store the Future and block a subsequent create until the Future completes. Change-Id: Ica98de3cc17c2d87195840bdf052d81ed3b9dd10 Signed-off-by: Tom Pantelis --- .../datastore/shardmanager/ShardManager.java | 100 +++++++++++++----- ...ributedShardedDOMDataTreeRemotingTest.java | 10 ++ 2 files changed, 81 insertions(+), 29 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 38fed0722f..bb289bc82e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -164,6 +164,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set shardReplicaOperationsInProgress = new HashSet<>(); + private final Map> shardActorStoppingFutures = new HashMap<>(); + private final String persistenceId; private final AbstractDataStore dataStore; @@ -462,16 +464,41 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onShardReplicaRemoved(ServerRemoved message) { - final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build(); - final ShardInformation shardInformation = localShards.remove(shardId.getShardName()); + removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build()); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void removeShard(final ShardIdentifier shardId) { + final String shardName = shardId.getShardName(); + final ShardInformation shardInformation = localShards.remove(shardName); if (shardInformation == null) { LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString()); return; - } else if (shardInformation.getActor() != null) { - LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor()); - shardInformation.getActor().tell(Shutdown.INSTANCE, self()); } - LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); + + final ActorRef shardActor = shardInformation.getActor(); + if (shardActor != null) { + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor); + FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig() + .getElectionTimeOutInterval().$times(3); + final Future stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE); + shardActorStoppingFutures.put(shardName, stopFuture); + stopFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Boolean result) { + if (failure == null) { + LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor); + } else { + LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure); + } + + self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName), + ActorRef.noSender()); + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName); persistShardList(); } @@ -532,11 +559,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message); final PrefixShardConfiguration config = message.getConfiguration(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); final String shardName = shardId.getShardName(); + if (isPreviousShardActorStopInProgress(shardName, message)) { + return; + } + if (localShards.containsKey(shardName)) { LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); final PrefixShardConfiguration existing = @@ -551,6 +581,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { doCreatePrefixShard(config, shardId, shardName); } + private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { + final Future stopFuture = shardActorStoppingFutures.get(shardName); + if (stopFuture == null) { + return false; + } + + LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(), + shardName, messageToDefer); + final ActorRef sender = getSender(); + stopFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Boolean result) { + LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer); + self().tell(messageToDefer, sender); + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + + return true; + } + private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) { configuration.addPrefixShardConfiguration(config); @@ -581,22 +631,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final DOMDataTreeIdentifier prefix = message.getPrefix(); final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - final ShardInformation shard = localShards.remove(shardId.getShardName()); configuration.removePrefixShardConfiguration(prefix); - - if (shard == null) { - LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix); - return; - } - - if (shard.getActor() != null) { - LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor()); - shard.getActor().tell(Shutdown.INSTANCE, self()); - } - - LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); - persistShardList(); + removeShard(shardId); } private void doCreateShard(final CreateShard createShard) { @@ -1063,9 +1100,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @VisibleForTesting - protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { - return getContext().actorOf(info.newProps(schemaContext) - .withDispatcher(shardDispatcherPath), info.getShardId().toString()); + protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) { + return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath), + info.getShardId().toString()); } private void findPrimary(FindPrimary message) { @@ -1256,8 +1293,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell((RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(), - response, getSender()), getTargetActor()); + final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(), + message.getShardPrefix(), response, getSender()); + if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { + getSelf().tell(runnable, getTargetActor()); + } } @Override @@ -1293,15 +1333,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), - getTargetActor()); + final RunnableMessage runnable = (RunnableMessage) () -> + addShard(getShardName(), response, getSender()); + if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { + getSelf().tell(runnable, getTargetActor()); + } } @Override public void onLocalPrimaryFound(LocalPrimaryShardFound message) { sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); } - }); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java index 30abf8c4e2..b2ef45a3dd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java @@ -175,6 +175,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { @Test public void testProducerRegistrations() throws Exception { + LOG.info("testProducerRegistrations starting"); initEmptyDatastores(); leaderTestKit.waitForMembersUp("member-2"); @@ -233,10 +234,13 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { } shardRegistration.close().toCompletableFuture().get(); + + LOG.info("testProducerRegistrations ending"); } @Test public void testWriteIntoMultipleShards() throws Exception { + LOG.info("testWriteIntoMultipleShards starting"); initEmptyDatastores(); leaderTestKit.waitForMembersUp("member-2"); @@ -277,10 +281,13 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { tx.submit().checkedGet(); shardRegistration.close().toCompletableFuture().get(); + + LOG.info("testWriteIntoMultipleShards ending"); } @Test public void testMultipleShardRegistrations() throws Exception { + LOG.info("testMultipleShardRegistrations starting"); initEmptyDatastores(); final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( @@ -371,10 +378,12 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); LOG.debug("All follower shards gone"); + LOG.info("testMultipleShardRegistrations ending"); } @Test public void testMultipleRegistrationsAtOnePrefix() throws Exception { + LOG.info("testMultipleRegistrationsAtOnePrefix starting"); initEmptyDatastores(); for (int i = 0; i < 10; i++) { @@ -408,5 +417,6 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); } + LOG.info("testMultipleRegistrationsAtOnePrefix ending"); } } -- 2.36.6