BUG 2817 : Handle ServerRemoved message in Shard/ShardManager 04/29804/5
authorMoiz Raja <moraja@cisco.com>
Tue, 17 Nov 2015 03:03:40 +0000 (19:03 -0800)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 19 Nov 2015 00:20:40 +0000 (00:20 +0000)
When a server is removed and the new ServerConfiguration is
replicated and consensus has been reached on it the RaftActor
sends a ServerReoved message to the Replica which has just been
removed.

This ServerRemoved messsage is received by the Shard and it
forwards the message to the ShardManager. The ShardManager
then removes the replica from it's persistent list.

Change-Id: I9252ab9d9768b549915d8cccf46f102127d97945
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 2e03138..e4cab8d 100644 (file)
@@ -63,6 +63,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -116,8 +117,11 @@ public class Shard extends RaftActor {
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
 
+
     private ShardSnapshot restoreFromSnapshot;
 
+
+
     protected Shard(AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -159,6 +163,7 @@ public class Shard extends RaftActor {
         snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
 
 
+
     }
 
     private void setTransactionCommitTimeout() {
@@ -256,8 +261,10 @@ public class Shard extends RaftActor {
                 context().parent().tell(message, self());
             } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
                 sender().tell(getShardMBean(), self());
-            } else if(message instanceof GetShardDataTree){
+            } else if(message instanceof GetShardDataTree) {
                 sender().tell(store.getDataTree(), self());
+            } else if(message instanceof ServerRemoved){
+                context().parent().forward(message, context());
             } else {
                 super.onReceiveCommand(message);
             }
@@ -330,7 +337,7 @@ public class Shard extends RaftActor {
             applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
             Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
-                DataTreeCandidatePayload.create(candidate));
+                    DataTreeCandidatePayload.create(candidate));
         }
     }
 
index bbac0e3..616f56c 100644 (file)
@@ -46,6 +46,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
@@ -85,6 +86,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -227,8 +229,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
+        } else if(message instanceof ServerRemoved){
+            onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof SaveSnapshotSuccess) {
-            LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+            LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
         } else if (message instanceof SaveSnapshotFailure) {
             LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
                 persistenceId(), ((SaveSnapshotFailure)message).cause());
@@ -237,6 +241,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onShardReplicaRemoved(ServerRemoved message) {
+        final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+        final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+        if(shardInformation == null) {
+            LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+            return;
+        } else if(shardInformation.getActor() != null) {
+            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+        }
+        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+        persistShardList();
+    }
+
     private void onGetSnapshot() {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
@@ -1156,6 +1174,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardName;
         }
 
+        @Nullable
         ActorRef getActor(){
             return actor;
         }
index 2072af6..3257e8f 100644 (file)
@@ -25,6 +25,7 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.actor.Status.Success;
+import akka.actor.Terminated;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.Dispatchers;
@@ -104,6 +105,7 @@ import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -331,7 +333,7 @@ public class ShardManagerTest extends AbstractActorTest {
             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
-            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
         }};
     }
 
@@ -432,7 +434,7 @@ public class ShardManagerTest extends AbstractActorTest {
             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
-            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
         }};
     }
 
@@ -1449,6 +1451,75 @@ public class ShardManagerTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testServerRemovedShardActorNotRunning() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                            put("default", Arrays.asList("member-1", "member-2")).
+                            put("astronauts", Arrays.asList("member-2")).
+                            put("people", Arrays.asList("member-1", "member-2")).build());
+
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newShardMgrProps(mockConfig));
+
+            shardManager.underlyingActor().waitForRecoveryComplete();
+
+            shardManager.tell(new FindLocalShard("people", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            shardManager.tell(new FindLocalShard("default", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            // Removed the default shard replica from member-1
+            ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+            final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+            shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+            shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+        }};
+    }
+
+    @Test
+    public void testServerRemovedShardActorRunning() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                            put("default", Arrays.asList("member-1", "member-2")).
+                            put("astronauts", Arrays.asList("member-2")).
+                            put("people", Arrays.asList("member-1", "member-2")).build());
+
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newShardMgrProps(mockConfig));
+
+            TestActorRef<MessageCollectorActor> shard = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+            watch(shard);
+
+            shardManager.underlyingActor().waitForRecoveryComplete();
+
+            shardManager.underlyingActor().addShardActor("default", shard);
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager.tell(new FindLocalShard("people", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            shardManager.tell(new FindLocalShard("default", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            // Removed the default shard replica from member-1
+            ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+            final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+            shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+            shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+
+            expectMsgClass(duration("5 seconds"), Terminated.class);
+        }};
+    }
+
+
     @Test
     public void testShardPersistenceWithRestoredData() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -1485,6 +1556,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final CountDownLatch snapshotPersist = new CountDownLatch(1);
+        private ShardManagerSnapshot snapshot;
+        private Map<String, ActorRef> shardActors = new HashMap<>();
 
         private TestShardManager(Builder builder) {
             super(builder);
@@ -1523,6 +1597,30 @@ public class ShardManagerTest extends AbstractActorTest {
                 return Props.create(TestShardManager.class, this);
             }
         }
+
+        @Override
+        public void saveSnapshot(Object obj) {
+            snapshot = (ShardManagerSnapshot) obj;
+            snapshotPersist.countDown();
+        }
+
+        void verifySnapshotPersisted(Set<String> shardList) {
+            assertEquals("saveSnapshot invoked", true,
+                    Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
+            assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
+        }
+
+        @Override
+        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+            if(shardActors.get(info.getShardName()) != null){
+                return shardActors.get(info.getShardName());
+            }
+            return super.newShardActor(schemaContext, info);
+        }
+
+        public void addShardActor(String shardName, ActorRef actorRef){
+            shardActors.put(shardName, actorRef);
+        }
     }
 
     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
index afd3543..f097c19 100644 (file)
@@ -94,6 +94,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -2658,4 +2659,19 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
+
+    @Test
+    public void testServerRemoved() throws Exception {
+        final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+        final ActorRef shard = parent.underlyingActor().context().actorOf(
+                newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                "testServerRemoved");
+
+        shard.tell(new ServerRemoved("test"), ActorRef.noSender());
+
+        MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
+
+    }
+
 }