Merge "Handle FollowerInitialSyncStatus message in Shard/ShardManager"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 10 Mar 2015 16:58:08 +0000 (16:58 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 10 Mar 2015 16:58:08 +0000 (16:58 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 3ce1f5d1e89038c7d4338f099cb0f49b909043cc..72b5ac95153f07c1b315e01c809f0b76f985b433 100644 (file)
@@ -18,12 +18,18 @@ package org.opendaylight.controller.cluster.raft.base.messages;
  */
 public class FollowerInitialSyncUpStatus {
     private final boolean initialSyncDone;
+    private final String name;
 
-    public FollowerInitialSyncUpStatus(boolean initialSyncDone){
+    public FollowerInitialSyncUpStatus(boolean initialSyncDone, String name){
         this.initialSyncDone = initialSyncDone;
+        this.name = name;
     }
 
     public boolean isInitialSyncDone() {
         return initialSyncDone;
     }
+
+    public String getName() {
+        return name;
+    }
 }
index ef5f11e37aef4fe7490887a27d91caedb0e50c51..e814cd000dda4ff2dddbc38665a37c80fc818bdf 100644 (file)
@@ -476,4 +476,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         }
     }
 
+    protected String getId(){
+        return context.getId();
+    }
+
 }
index 618865cb88eb8877cdcfdcfb29208c80707c2c0f..0f251a3012e7afe492b867ced86859316fde6d88 100644 (file)
@@ -352,7 +352,7 @@ public class Follower extends AbstractRaftActorBehavior {
         return snapshotTracker;
     }
 
-    private static class InitialSyncStatusTracker {
+    private class InitialSyncStatusTracker {
 
         private static final long INVALID_LOG_INDEX = -2L;
         private long initialLeaderCommit = INVALID_LOG_INDEX;
@@ -374,10 +374,10 @@ public class Follower extends AbstractRaftActorBehavior {
 
             if(!initialSyncUpDone){
                 if(initialLeaderCommit == INVALID_LOG_INDEX){
-                    actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender());
+                    actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender());
                     initialLeaderCommit = leaderCommit;
                 } else if(commitIndex >= initialLeaderCommit){
-                    actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender());
+                    actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender());
                     initialSyncUpDone = true;
                 }
             }
index 52b4652de6e7d464d9d54c67ebea37ca233de862..e704e42465b99e1183ca20c19742ccd2424e410f 100644 (file)
@@ -67,6 +67,7 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
@@ -295,6 +296,9 @@ public class Shard extends RaftActor {
                 onDatastoreContext((DatastoreContext)message);
             } else if(message instanceof RegisterRoleChangeListener){
                 roleChangeNotifier.get().forward(message, context());
+            } else if (message instanceof FollowerInitialSyncUpStatus){
+                shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+                context().parent().tell(message, self());
             } else {
                 super.onReceiveCommand(message);
             }
index c441afb49787fb7d5ae946c7fc0e0e91ec7137ad..136c6813eaba9d7d116b4ba0b4609bbd4848fb13 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -55,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -166,16 +168,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             ignoreMessage(message);
         } else if(message instanceof DatastoreContext) {
             onDatastoreContext((DatastoreContext)message);
-        } else if(message instanceof RoleChangeNotification){
+        } else if(message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
+        } else if(message instanceof FollowerInitialSyncUpStatus){
+            onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
         } else{
             unknownMessage(message);
         }
 
     }
 
-    private void onRoleChangeNotification(RoleChangeNotification message) {
-        RoleChangeNotification roleChanged = message;
+    private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+        LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
+                status.isInitialSyncDone());
+
+        ShardInformation shardInformation = findShardInformation(status.getName());
+
+        if(shardInformation != null) {
+            shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
+
+            mBean.setSyncStatus(isInSync());
+        }
+
+    }
+
+    private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
         LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
@@ -189,6 +206,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 waitTillReadyCountdownLatch.countDown();
             }
+
+            mBean.setSyncStatus(isInSync());
         }
     }
 
@@ -214,6 +233,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return isReady;
     }
 
+    private boolean isInSync(){
+        for (ShardInformation info : localShards.values()) {
+            if(!info.isInSync()){
+                return false;
+            }
+        }
+        return true;
+    }
+
     private void onActorInitialized(Object message) {
         final ActorRef sender = getSender();
 
@@ -519,6 +547,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return dataPersistenceProvider;
     }
 
+    @VisibleForTesting
+    ShardManagerInfoMBean getMBean(){
+        return mBean;
+    }
+
     private class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
@@ -529,6 +562,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
 
+        private boolean followerSyncStatus = false;
+
         private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
         private String role ;
 
@@ -607,6 +642,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return this.role;
         }
 
+        public void setFollowerSyncStatus(boolean syncStatus){
+            this.followerSyncStatus = syncStatus;
+        }
+
+        public boolean isInSync(){
+            if(RaftState.Follower.name().equals(this.role)){
+                return followerSyncStatus;
+            } else if(RaftState.Leader.name().equals(this.role)){
+                return true;
+            }
+
+            return false;
+        }
+
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
index 945ae0a4786ab931a62c071533ec89842847a84e..6222d3be09fce8ba46ee0a8b94cfd4e4bd32fe41 100644 (file)
@@ -67,6 +67,8 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
     private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
+    private boolean followerInitialSyncStatus = false;
+
     public ShardStats(final String shardName, final String mxBeanType) {
         super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
     }
@@ -276,4 +278,13 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
     public void setDataStore(final InMemoryDOMDataStore store) {
         setNotificationManager(store.getDataChangeListenerNotificationManager());
     }
+
+    public void setFollowerInitialSyncStatus(boolean followerInitialSyncStatus) {
+        this.followerInitialSyncStatus = followerInitialSyncStatus;
+    }
+
+    @Override
+    public boolean getFollowerInitialSyncStatus() {
+        return followerInitialSyncStatus;
+    }
 }
index 99c8daf87d30af3ce66bf3b5c42aa86133ec5575..8adc8b24b27c1ad4d3eff65730cf9d27646fd3d0 100644 (file)
@@ -18,6 +18,8 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo
 
     private final List<String> localShards;
 
+    private boolean syncStatus = false;
+
     public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
         super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
         this.localShards = localShards;
@@ -36,4 +38,13 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo
     public List<String> getLocalShards() {
         return localShards;
     }
+
+    @Override
+    public boolean getSyncStatus() {
+        return this.syncStatus;
+    }
+
+    public void setSyncStatus(boolean syncStatus){
+        this.syncStatus = syncStatus;
+    }
 }
index f0cdacc9ef2bc0e08ccb0348a8d990279de8f57a..c005751380517d626cf5dc8012d71016e41c0962 100644 (file)
@@ -20,8 +20,10 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -48,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -469,6 +472,132 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
 
+    @Test
+    public void testByDefaultSyncStatusIsFalse() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration(),
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+    }
+
+    @Test
+    public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration(),
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+                RaftState.Follower.name(), RaftState.Leader.name()));
+
+        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+    }
+
+    @Test
+    public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration(),
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+                RaftState.Follower.name(), RaftState.Candidate.name()));
+
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+    }
+
+    @Test
+    public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration(),
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+                RaftState.Candidate.name(), RaftState.Follower.name()));
+
+        // Initially will be false
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Send status true will make sync status true
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+
+        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+        // Send status false will make sync status false
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
+
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+    }
+
+    @Test
+    public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration() {
+                    @Override
+                    public List<String> getMemberShardNames(String memberName) {
+                        return Arrays.asList("default", "astronauts");
+                    }
+                },
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+
+        // Initially will be false
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Make default shard leader
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+                RaftState.Follower.name(), RaftState.Leader.name()));
+
+        // default = Leader, astronauts is unknown so sync status remains false
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Make astronauts shard leader as well
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+                RaftState.Follower.name(), RaftState.Leader.name()));
+
+        // Now sync status should be true
+        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+        // Make astronauts a Follower
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+                RaftState.Leader.name(), RaftState.Follower.name()));
+
+        // Sync status is not true
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Make the astronauts follower sync status true
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
+
+        // Sync status is now true
+        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+    }
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
index 7dfbd668b811231b4b32edd76a04987c671aba99..d930b2519e379f9b1f9bb54a5482e25857cb1faa 100644 (file)
@@ -84,6 +84,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
@@ -1618,7 +1619,25 @@ public class ShardTest extends AbstractActorTest {
                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
 
                 assertEquals(1, allMatching.size());
-            }};
+            }
+        };
+    }
+
+    @Test
+    public void testFollowerInitialSyncStatus() throws Exception {
+        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                "testFollowerInitialSyncStatus");
+
+        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
+
+        assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
+
+        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
+
+        assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }