BUG 2817 : Handle ServerRemoved message in Shard/ShardManager 04/29804/5
authorMoiz Raja <moraja@cisco.com>
Tue, 17 Nov 2015 03:03:40 +0000 (19:03 -0800)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 19 Nov 2015 00:20:40 +0000 (00:20 +0000)
When a server is removed and the new ServerConfiguration is
replicated and consensus has been reached on it the RaftActor
sends a ServerReoved message to the Replica which has just been
removed.

This ServerRemoved messsage is received by the Shard and it
forwards the message to the ShardManager. The ShardManager
then removes the replica from it's persistent list.

Change-Id: I9252ab9d9768b549915d8cccf46f102127d97945
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 2e0313807ebbe44b2f190e41909d428149aedfef..e4cab8d4a6e4b9ff85b52257f4e489efc0b3c076 100644 (file)
@@ -63,6 +63,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -116,8 +117,11 @@ public class Shard extends RaftActor {
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
 
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
 
+
     private ShardSnapshot restoreFromSnapshot;
 
     private ShardSnapshot restoreFromSnapshot;
 
+
+
     protected Shard(AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
     protected Shard(AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -159,6 +163,7 @@ public class Shard extends RaftActor {
         snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
 
 
         snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
 
 
+
     }
 
     private void setTransactionCommitTimeout() {
     }
 
     private void setTransactionCommitTimeout() {
@@ -256,8 +261,10 @@ public class Shard extends RaftActor {
                 context().parent().tell(message, self());
             } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
                 sender().tell(getShardMBean(), self());
                 context().parent().tell(message, self());
             } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
                 sender().tell(getShardMBean(), self());
-            } else if(message instanceof GetShardDataTree){
+            } else if(message instanceof GetShardDataTree) {
                 sender().tell(store.getDataTree(), self());
                 sender().tell(store.getDataTree(), self());
+            } else if(message instanceof ServerRemoved){
+                context().parent().forward(message, context());
             } else {
                 super.onReceiveCommand(message);
             }
             } else {
                 super.onReceiveCommand(message);
             }
@@ -330,7 +337,7 @@ public class Shard extends RaftActor {
             applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
             Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
             applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
             Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
-                DataTreeCandidatePayload.create(candidate));
+                    DataTreeCandidatePayload.create(candidate));
         }
     }
 
         }
     }
 
index bbac0e3e25da0cac560b26fe639a50a250d743a9..616f56c466bbac02194460dca1d07b4a57b2569f 100644 (file)
@@ -46,6 +46,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
@@ -85,6 +86,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -227,8 +229,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
+        } else if(message instanceof ServerRemoved){
+            onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof SaveSnapshotSuccess) {
         } else if (message instanceof SaveSnapshotSuccess) {
-            LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+            LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
         } else if (message instanceof SaveSnapshotFailure) {
             LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
                 persistenceId(), ((SaveSnapshotFailure)message).cause());
         } else if (message instanceof SaveSnapshotFailure) {
             LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
                 persistenceId(), ((SaveSnapshotFailure)message).cause());
@@ -237,6 +241,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
+    private void onShardReplicaRemoved(ServerRemoved message) {
+        final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+        final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+        if(shardInformation == null) {
+            LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+            return;
+        } else if(shardInformation.getActor() != null) {
+            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+        }
+        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+        persistShardList();
+    }
+
     private void onGetSnapshot() {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
     private void onGetSnapshot() {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
@@ -1156,6 +1174,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardName;
         }
 
             return shardName;
         }
 
+        @Nullable
         ActorRef getActor(){
             return actor;
         }
         ActorRef getActor(){
             return actor;
         }
index 2072af68d42b30775a68241eba6d05b1a05c2607..3257e8f910e0538ec99bbd0f608daf2b5056e1bd 100644 (file)
@@ -25,6 +25,7 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.actor.Status.Success;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.actor.Status.Success;
+import akka.actor.Terminated;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.Dispatchers;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.Dispatchers;
@@ -104,6 +105,7 @@ import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -331,7 +333,7 @@ public class ShardManagerTest extends AbstractActorTest {
             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
-            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
         }};
     }
 
         }};
     }
 
@@ -432,7 +434,7 @@ public class ShardManagerTest extends AbstractActorTest {
             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
-            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
         }};
     }
 
         }};
     }
 
@@ -1449,6 +1451,75 @@ public class ShardManagerTest extends AbstractActorTest {
 
     }
 
 
     }
 
+    @Test
+    public void testServerRemovedShardActorNotRunning() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                            put("default", Arrays.asList("member-1", "member-2")).
+                            put("astronauts", Arrays.asList("member-2")).
+                            put("people", Arrays.asList("member-1", "member-2")).build());
+
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newShardMgrProps(mockConfig));
+
+            shardManager.underlyingActor().waitForRecoveryComplete();
+
+            shardManager.tell(new FindLocalShard("people", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            shardManager.tell(new FindLocalShard("default", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            // Removed the default shard replica from member-1
+            ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+            final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+            shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+            shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+        }};
+    }
+
+    @Test
+    public void testServerRemovedShardActorRunning() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                            put("default", Arrays.asList("member-1", "member-2")).
+                            put("astronauts", Arrays.asList("member-2")).
+                            put("people", Arrays.asList("member-1", "member-2")).build());
+
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newShardMgrProps(mockConfig));
+
+            TestActorRef<MessageCollectorActor> shard = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+            watch(shard);
+
+            shardManager.underlyingActor().waitForRecoveryComplete();
+
+            shardManager.underlyingActor().addShardActor("default", shard);
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager.tell(new FindLocalShard("people", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            shardManager.tell(new FindLocalShard("default", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            // Removed the default shard replica from member-1
+            ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+            final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+            shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+            shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+
+            expectMsgClass(duration("5 seconds"), Terminated.class);
+        }};
+    }
+
+
     @Test
     public void testShardPersistenceWithRestoredData() throws Exception {
         new JavaTestKit(getSystem()) {{
     @Test
     public void testShardPersistenceWithRestoredData() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -1485,6 +1556,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final CountDownLatch snapshotPersist = new CountDownLatch(1);
+        private ShardManagerSnapshot snapshot;
+        private Map<String, ActorRef> shardActors = new HashMap<>();
 
         private TestShardManager(Builder builder) {
             super(builder);
 
         private TestShardManager(Builder builder) {
             super(builder);
@@ -1523,6 +1597,30 @@ public class ShardManagerTest extends AbstractActorTest {
                 return Props.create(TestShardManager.class, this);
             }
         }
                 return Props.create(TestShardManager.class, this);
             }
         }
+
+        @Override
+        public void saveSnapshot(Object obj) {
+            snapshot = (ShardManagerSnapshot) obj;
+            snapshotPersist.countDown();
+        }
+
+        void verifySnapshotPersisted(Set<String> shardList) {
+            assertEquals("saveSnapshot invoked", true,
+                    Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
+            assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
+        }
+
+        @Override
+        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+            if(shardActors.get(info.getShardName()) != null){
+                return shardActors.get(info.getShardName());
+            }
+            return super.newShardActor(schemaContext, info);
+        }
+
+        public void addShardActor(String shardName, ActorRef actorRef){
+            shardActors.put(shardName, actorRef);
+        }
     }
 
     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
     }
 
     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
index afd354317caa26eea6c3ec607c92e319e8c08df3..f097c19e512a3766a3d836c5cc7e061862b29666 100644 (file)
@@ -94,6 +94,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -2658,4 +2659,19 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
+
+    @Test
+    public void testServerRemoved() throws Exception {
+        final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+        final ActorRef shard = parent.underlyingActor().context().actorOf(
+                newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                "testServerRemoved");
+
+        shard.tell(new ServerRemoved("test"), ActorRef.noSender());
+
+        MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
+
+    }
+
 }
 }