From 4680d02510a884b3a893345f423cedcc8c5af0f4 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Mon, 16 Nov 2015 19:03:40 -0800 Subject: [PATCH] BUG 2817 : Handle ServerRemoved message in Shard/ShardManager When a server is removed and the new ServerConfiguration is replicated and consensus has been reached on it the RaftActor sends a ServerReoved message to the Replica which has just been removed. This ServerRemoved messsage is received by the Shard and it forwards the message to the ShardManager. The ShardManager then removes the replica from it's persistent list. Change-Id: I9252ab9d9768b549915d8cccf46f102127d97945 Signed-off-by: Moiz Raja --- .../controller/cluster/datastore/Shard.java | 11 ++- .../controller/cluster/datastore/ShardManager.java | 21 ++++- .../cluster/datastore/ShardManagerTest.java | 102 ++++++++++++++++++++- .../controller/cluster/datastore/ShardTest.java | 16 ++++ 4 files changed, 145 insertions(+), 5 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 2e0313807e..e4cab8d4a6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -63,6 +63,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -116,8 +117,11 @@ public class Shard extends RaftActor { private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this); + private ShardSnapshot restoreFromSnapshot; + + protected Shard(AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -159,6 +163,7 @@ public class Shard extends RaftActor { snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name); + } private void setTransactionCommitTimeout() { @@ -256,8 +261,10 @@ public class Shard extends RaftActor { context().parent().tell(message, self()); } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){ sender().tell(getShardMBean(), self()); - } else if(message instanceof GetShardDataTree){ + } else if(message instanceof GetShardDataTree) { sender().tell(store.getDataTree(), self()); + } else if(message instanceof ServerRemoved){ + context().parent().forward(message, context()); } else { super.onReceiveCommand(message); } @@ -330,7 +337,7 @@ public class Shard extends RaftActor { applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate); } else { Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), - DataTreeCandidatePayload.create(candidate)); + DataTreeCandidatePayload.create(candidate)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index bbac0e3e25..616f56c466 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -85,6 +86,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -227,8 +229,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onRemoveShardReplica((RemoveShardReplica)message); } else if(message instanceof GetSnapshot) { onGetSnapshot(); + } else if(message instanceof ServerRemoved){ + onShardReplicaRemoved((ServerRemoved) message); } else if (message instanceof SaveSnapshotSuccess) { - LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId()); + LOG.debug("{} saved ShardManager snapshot successfully", persistenceId()); } else if (message instanceof SaveSnapshotFailure) { LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), ((SaveSnapshotFailure)message).cause()); @@ -237,6 +241,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onShardReplicaRemoved(ServerRemoved message) { + final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build(); + final ShardInformation shardInformation = localShards.remove(shardId.getShardName()); + if(shardInformation == null) { + LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString()); + return; + } else if(shardInformation.getActor() != null) { + LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor()); + shardInformation.getActor().tell(PoisonPill.getInstance(), self()); + } + LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); + persistShardList(); + } + private void onGetSnapshot() { LOG.debug("{}: onGetSnapshot", persistenceId()); @@ -1156,6 +1174,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardName; } + @Nullable ActorRef getActor(){ return actor; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 2072af68d4..3257e8f910 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -25,6 +25,7 @@ import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; import akka.actor.Status.Success; +import akka.actor.Terminated; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.dispatch.Dispatchers; @@ -104,6 +105,7 @@ import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -331,7 +333,7 @@ public class ShardManagerTest extends AbstractActorTest { LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); - assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); }}; } @@ -432,7 +434,7 @@ public class ShardManagerTest extends AbstractActorTest { LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); - assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); }}; } @@ -1449,6 +1451,75 @@ public class ShardManagerTest extends AbstractActorTest { } + @Test + public void testServerRemovedShardActorNotRunning() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")). + put("people", Arrays.asList("member-1", "member-2")).build()); + + TestActorRef shardManager = TestActorRef.create(getSystem(), + newShardMgrProps(mockConfig)); + + shardManager.underlyingActor().waitForRecoveryComplete(); + + shardManager.tell(new FindLocalShard("people", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + shardManager.tell(new FindLocalShard("default", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + // Removed the default shard replica from member-1 + ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); + final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build(); + shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); + + shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); + }}; + } + + @Test + public void testServerRemovedShardActorRunning() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")). + put("people", Arrays.asList("member-1", "member-2")).build()); + + TestActorRef shardManager = TestActorRef.create(getSystem(), + newShardMgrProps(mockConfig)); + + TestActorRef shard = TestActorRef.create(getSystem(), MessageCollectorActor.props()); + + watch(shard); + + shardManager.underlyingActor().waitForRecoveryComplete(); + + shardManager.underlyingActor().addShardActor("default", shard); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new FindLocalShard("people", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + shardManager.tell(new FindLocalShard("default", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + // Removed the default shard replica from member-1 + ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); + final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build(); + shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); + + shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); + + expectMsgClass(duration("5 seconds"), Terminated.class); + }}; + } + + @Test public void testShardPersistenceWithRestoredData() throws Exception { new JavaTestKit(getSystem()) {{ @@ -1485,6 +1556,9 @@ public class ShardManagerTest extends AbstractActorTest { private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final CountDownLatch snapshotPersist = new CountDownLatch(1); + private ShardManagerSnapshot snapshot; + private Map shardActors = new HashMap<>(); private TestShardManager(Builder builder) { super(builder); @@ -1523,6 +1597,30 @@ public class ShardManagerTest extends AbstractActorTest { return Props.create(TestShardManager.class, this); } } + + @Override + public void saveSnapshot(Object obj) { + snapshot = (ShardManagerSnapshot) obj; + snapshotPersist.countDown(); + } + + void verifySnapshotPersisted(Set shardList) { + assertEquals("saveSnapshot invoked", true, + Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); + assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + if(shardActors.get(info.getShardName()) != null){ + return shardActors.get(info.getShardName()); + } + return super.newShardActor(schemaContext, info); + } + + public void addShardActor(String shardName, ActorRef actorRef){ + shardActors.put(shardName, actorRef); + } } private static class DelegatingShardManagerCreator implements Creator { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index afd354317c..f097c19e51 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -94,6 +94,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -2658,4 +2659,19 @@ public class ShardTest extends AbstractShardTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } + + @Test + public void testServerRemoved() throws Exception { + final TestActorRef parent = TestActorRef.create(getSystem(), MessageCollectorActor.props()); + + final ActorRef shard = parent.underlyingActor().context().actorOf( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testServerRemoved"); + + shard.tell(new ServerRemoved("test"), ActorRef.noSender()); + + MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class); + + } + } -- 2.16.6