BUG 2187 - Persisting shard list in ShardManager 70/29370/7
authorkalaiselvik <Kalaiselvi_K@Dell.com>
Sat, 14 Nov 2015 19:00:42 +0000 (00:30 +0530)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 16 Nov 2015 05:57:33 +0000 (00:57 -0500)
In ShardManager, the local shard list is persisted as a snapshot.
On recovery, persisted shard list is used to create the shards.
During recovery, obtained persisted information is updated to the
configuration so that it is uniformly available to the DatastoreContext.

Incorporated the comments

Also, as localShards are now created after RecoveryCompletion, the
shardManager mbean is associated with the shardManager immediately
after creation. On creating the localShards, the shards addition
is notified to the mbean object.
In the shardManagerTests involving verification of the syncStatus
and CountDownLatch objects, the testcases are made to wait for
localShard creation by waiting for recoveryCompletion message.

Change-Id: I523ed9b14af4b1b6e272f05faac1cf37abfef336
Signed-off-by: kalaiselvik <Kalaiselvi_K@Dell.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ModuleConfig.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImplTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java

index 49e1fe3..98a6090 100644 (file)
@@ -21,6 +21,9 @@ import akka.cluster.ClusterEvent;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
 import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
@@ -34,6 +37,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -116,7 +120,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfo mBean;
+    private final ShardManagerInfo mBean;
 
     private DatastoreContextFactory datastoreContextFactory;
 
@@ -149,7 +153,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        createLocalShards();
+        List<String> localShardActorNames = new ArrayList<>();
+        mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+                "shard-manager-" + this.type,
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
+                localShardActorNames);
+        mBean.setShardManager(this);
     }
 
     @Override
@@ -199,6 +208,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
+        } else if (message instanceof SaveSnapshotSuccess) {
+            LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+        } else if (message instanceof SaveSnapshotFailure) {
+            LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
+                persistenceId(), ((SaveSnapshotFailure)message).cause());
         } else {
             unknownMessage(message);
         }
@@ -428,6 +442,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
             // journal on upgrade from Helium.
             deleteMessages(lastSequenceNr());
+            createLocalShards();
+        } else if (message instanceof SnapshotOffer) {
+            handleShardRecovery((SnapshotOffer) message);
         }
     }
 
@@ -726,20 +743,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         restoreFromSnapshot = null; // null out to GC
 
-        List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
                         shardSnapshots.get(shardName)), peerAddressResolver));
+            mBean.addLocalShard(shardId.toString());
         }
-
-        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
-
-        mBean.setShardManager(this);
     }
 
     /**
@@ -870,6 +881,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
                           getPeerAddresses(shardName), datastoreContext,
                           Shard.builder(), peerAddressResolver);
+        shardInfo.setShardActiveMember(false);
         localShards.put(shardName, shardInfo);
         shardInfo.setActor(newShardActor(schemaContext, shardInfo));
 
@@ -916,6 +928,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+            shardInfo.setShardActiveMember(true);
+            persistShardList();
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
             sender.tell(new akka.actor.Status.Success(true), getSelf());
@@ -961,6 +975,39 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return;
     }
 
+    private void persistShardList() {
+        List<String> shardList = new ArrayList(localShards.keySet());
+        for (ShardInformation shardInfo : localShards.values()) {
+            if (!shardInfo.isShardActiveMember()) {
+                shardList.remove(shardInfo.getShardName());
+            }
+        }
+        LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+        saveSnapshot(new ShardManagerSnapshot(shardList));
+    }
+
+    private void handleShardRecovery(SnapshotOffer offer) {
+        LOG.debug ("{}: in handleShardRecovery", persistenceId());
+        ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+        String currentMember = cluster.getCurrentMemberName();
+        Set<String> configuredShardList =
+            new HashSet<>(configuration.getMemberShardNames(currentMember));
+        for (String shard : snapshot.getShardList()) {
+            if (!configuredShardList.contains(shard)) {
+                // add the current member as a replica for the shard
+                LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+                configuration.addMemberReplicaForShard(shard, currentMember);
+            } else {
+                configuredShardList.remove(shard);
+            }
+        }
+        for (String shard : configuredShardList) {
+            // remove the member as a replica for the shard
+            LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+            configuration.removeMemberReplicaForShard(shard, currentMember);
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
@@ -984,6 +1031,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private DatastoreContext datastoreContext;
         private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
+        private boolean shardActiveStatus = true;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
@@ -1182,6 +1230,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
+
+        void setShardActiveMember(boolean flag) {
+            shardActiveStatus = flag;
+        }
+
+        boolean isShardActiveMember() {
+            return shardActiveStatus;
+        }
     }
 
     private static class OnShardInitialized {
@@ -1307,13 +1363,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return this;
         }
 
-        public Props props() {
+        protected void verify() {
             sealed = true;
             Preconditions.checkNotNull(cluster, "cluster should not be null");
             Preconditions.checkNotNull(configuration, "configuration should not be null");
             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+        }
+
+        public Props props() {
+            verify();
             return Props.create(ShardManager.class, this);
         }
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java
new file mode 100644 (file)
index 0000000..4cc54dd
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Dell Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nonnull;
+
+/**
+ * Persisted data of the ShardManager
+ */
+
+public class ShardManagerSnapshot implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final List<String> shardList;
+
+    public ShardManagerSnapshot(@Nonnull List<String> shardList) {
+        this.shardList = new ArrayList<>(Preconditions.checkNotNull(shardList));
+    }
+
+    public List<String> getShardList() {
+        return this.shardList;
+    }
+
+    @Override
+    public String toString() {
+        return "ShardManagerSnapshot [ShardList = " + shardList + " ]";
+    }
+}
index dea77f5..b6122b3 100644 (file)
@@ -60,4 +60,14 @@ public interface Configuration {
      * Verifies if the given module shard in available in the cluster
      */
     boolean isShardConfigured(String shardName);
+
+    /**
+     * Adds the given member as the new replica for the given shardName
+     */
+    void addMemberReplicaForShard (String shardName, String memberName);
+
+    /**
+     * Removes the given member as a replica for the given shardName
+     */
+    void removeMemberReplicaForShard (String shardName, String memberName);
 }
index d553c66..b7bcc59 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -157,4 +158,47 @@ public class ConfigurationImpl implements Configuration {
         Preconditions.checkNotNull(shardName, "shardName should not be null");
         return allShardNames.contains(shardName);
     }
+
+    @Override
+    public void addMemberReplicaForShard (String shardName, String newMemberName) {
+        Preconditions.checkNotNull(shardName, "shardName should not be null");
+        Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
+
+        for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
+            ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
+            if(shardConfig != null) {
+                ModuleConfig newModuleConfig = new ModuleConfig(moduleConfig);
+                Set<String> replica = new HashSet<>(shardConfig.getReplicas());
+                replica.add(newMemberName);
+                newModuleConfig.addShardConfig(shardName, ImmutableSet.copyOf(replica));
+                updateModuleConfigMap(newModuleConfig);
+                return;
+            }
+        }
+    }
+
+    @Override
+    public void removeMemberReplicaForShard (String shardName, String newMemberName) {
+        Preconditions.checkNotNull(shardName, "shardName should not be null");
+        Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
+
+        for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
+            ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
+            if(shardConfig != null) {
+                ModuleConfig newModuleConfig = new ModuleConfig(moduleConfig);
+                Set<String> replica = new HashSet<>(shardConfig.getReplicas());
+                replica.remove(newMemberName);
+                newModuleConfig.addShardConfig(shardName, ImmutableSet.copyOf(replica));
+                updateModuleConfigMap(newModuleConfig);
+                return;
+            }
+        }
+    }
+
+    private void updateModuleConfigMap(ModuleConfig moduleConfig) {
+        HashMap<String, ModuleConfig> newModuleConfigMap = new HashMap<>(moduleConfigMap);
+        newModuleConfigMap.put(moduleConfig.getName(), moduleConfig);
+        moduleConfigMap = ImmutableMap.<String, ModuleConfig>builder().putAll(newModuleConfigMap).build();
+        return;
+    }
 }
index 1a309a0..b549467 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.config;
 
+import com.google.common.collect.ImmutableSet;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,6 +29,16 @@ public class ModuleConfig {
         this.name = name;
     }
 
+    public ModuleConfig(ModuleConfig moduleConfig) {
+        this.name = moduleConfig.getName();
+        this.nameSpace = moduleConfig.getNameSpace();
+        this.shardStrategy = moduleConfig.getShardStrategy();
+        for (ShardConfig shardConfig : moduleConfig.getShardConfigs()) {
+            shardConfigs.put(shardConfig.getName(), new ShardConfig(shardConfig.getName(),
+                ImmutableSet.copyOf(shardConfig.getReplicas())));
+        }
+    }
+
     public String getName() {
         return name;
     }
@@ -63,4 +74,8 @@ public class ModuleConfig {
     public void setShardStrategy(ShardStrategy shardStrategy) {
         this.shardStrategy = shardStrategy;
     }
-}
\ No newline at end of file
+
+    public ShardConfig removeShardConfig(String name) {
+        return shardConfigs.remove(name);
+    }
+}
index 29cb189..b6ea14f 100644 (file)
@@ -100,6 +100,7 @@ import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -137,6 +138,7 @@ public class ShardManagerTest extends AbstractActorTest {
         MockitoAnnotations.initMocks(this);
 
         InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
 
         if(mockShardActor == null) {
             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
@@ -149,6 +151,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @After
     public void tearDown() {
         InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
     private Props newShardMgrProps() {
@@ -163,9 +166,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps(Configuration config) {
-        return ShardManager.builder().cluster(new MockClusterWrapper()).configuration(config).
-                datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
-                waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache).props();
+        return TestShardManager.builder(datastoreContextBuilder).configuration(config).props();
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
@@ -188,6 +189,17 @@ public class ShardManagerTest extends AbstractActorTest {
         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
+    private TestShardManager newTestShardManager() {
+        return newTestShardManager(newShardMgrProps());
+    }
+
+    private TestShardManager newTestShardManager(Props props) {
+        TestActorRef<TestShardManager> shardManagerActor = TestActorRef.create(getSystem(), props);
+        TestShardManager shardManager = shardManagerActor.underlyingActor();
+        shardManager.waitForRecoveryComplete();
+        return shardManager;
+    }
+
     @Test
     public void testPerShardDatastoreContext() throws Exception {
         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
@@ -846,8 +858,7 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
 
         new JavaTestKit(getSystem()) {{
-            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
-                    Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
             shardManager.underlyingActor().waitForRecoveryComplete();
             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
@@ -862,196 +873,168 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
-                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
-                        memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
+        String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+        shardManager.onReceiveCommand(new RoleChangeNotification(
+                memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
 
-                verify(ready, never()).countDown();
+        verify(ready, never()).countDown();
 
-                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
-                        Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
+        shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+                Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
 
-                verify(ready, times(1)).countDown();
-
-            }};
+        verify(ready, times(1)).countDown();
     }
 
     @Test
     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
-
-                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
-                        memberId, null, RaftState.Follower.name()));
+        new JavaTestKit(getSystem()) {{
+            TestShardManager shardManager = newTestShardManager();
 
-                verify(ready, never()).countDown();
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.onReceiveCommand(new RoleChangeNotification(
+                    memberId, null, RaftState.Follower.name()));
 
-                shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+            verify(ready, never()).countDown();
 
-                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
-                        "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
-                        DataStoreVersions.CURRENT_VERSION));
+            shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
 
-                verify(ready, times(1)).countDown();
+            shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
+                    "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
+                    DataStoreVersions.CURRENT_VERSION));
 
-            }};
+            verify(ready, times(1)).countDown();
+        }};
     }
 
     @Test
     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
-
-                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
-                        memberId, null, RaftState.Follower.name()));
+        new JavaTestKit(getSystem()) {{
+            TestShardManager shardManager = newTestShardManager();
 
-                verify(ready, never()).countDown();
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
 
-                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
-                        "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
-                        DataStoreVersions.CURRENT_VERSION));
+            verify(ready, never()).countDown();
 
-                shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+            shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
+                    "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
+                    DataStoreVersions.CURRENT_VERSION));
 
-                verify(ready, times(1)).countDown();
+            shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
 
-            }};
+            verify(ready, times(1)).countDown();
+        }};
     }
 
     @Test
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
-                        "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+        shardManager.onReceiveCommand(new RoleChangeNotification(
+                "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
 
-                verify(ready, never()).countDown();
-
-            }};
+        verify(ready, never()).countDown();
     }
 
-
     @Test
     public void testByDefaultSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = newShardMgrProps();
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
-
-        ShardManager shardManagerActor = shardManager.underlyingActor();
+        TestShardManager shardManager = newTestShardManager();
 
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
     }
 
     @Test
     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
-        final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
-        ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+        shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
-        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(true, shardManager.getMBean().getSyncStatus());
     }
 
     @Test
     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
-        final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
-        ShardManager shardManagerActor = shardManager.underlyingActor();
         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
                 RaftState.Follower.name(), RaftState.Candidate.name()));
 
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(
+        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
                 true, shardId));
 
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
     }
 
     @Test
     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
-        final Props persistentProps = newShardMgrProps();
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
+        TestShardManager shardManager = newTestShardManager();
 
         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
                 RaftState.Candidate.name(), RaftState.Follower.name()));
 
         // Initially will be false
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Send status true will make sync status true
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
+        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
 
-        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(true, shardManager.getMBean().getSyncStatus());
 
         // Send status false will make sync status false
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
+        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
 
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
     }
 
     @Test
     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
-        final Props persistentProps = newShardMgrProps(new MockConfiguration() {
+        TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
             @Override
             public List<String> getMemberShardNames(String memberName) {
                 return Arrays.asList("default", "astronauts");
             }
-        });
-
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
-
-        ShardManager shardManagerActor = shardManager.underlyingActor();
+        }));
 
         // Initially will be false
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Make default shard leader
         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(defaultShardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // default = Leader, astronauts is unknown so sync status remains false
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Make astronauts shard leader as well
         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // Now sync status should be true
-        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(true, shardManager.getMBean().getSyncStatus());
 
         // Make astronauts a Follower
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Leader.name(), RaftState.Follower.name()));
 
         // Sync status is not true
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Make the astronauts follower sync status true
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
+        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
 
         // Sync status is now true
-        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(true, shardManager.getMBean().getSyncStatus());
 
     }
 
@@ -1260,7 +1243,8 @@ public class ShardManagerTest extends AbstractActorTest {
                 AddServer.class);
             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
-
+            newReplicaShardManager.underlyingActor()
+                .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
             expectMsgClass(duration("5 seconds"), Status.Success.class);
         }};
 
@@ -1314,13 +1298,45 @@ public class ShardManagerTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testShardPersistenceWithRestoredData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                   put("default", Arrays.asList("member-1", "member-2")).
+                   put("astronauts", Arrays.asList("member-2")).
+                   put("people", Arrays.asList("member-1", "member-2")).build());
+            String[] restoredShards = {"default", "astronauts"};
+            ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+            InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot);
+
+            //create shardManager to come up with restored data
+            TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
+                    newShardMgrProps(mockConfig));
+
+            newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
+
+            newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
+            LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+            assertEquals("for uninitialized shard", "people", notFound.getShardName());
+
+            //Verify a local shard is created for the restored shards,
+            //although we expect a NotInitializedException for the shards as the actor initialization
+            //message is not sent for them
+            newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+        }};
+    }
+
+
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
-        TestShardManager(String shardMrgIDSuffix) {
-            super(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
-                    datastoreContextFactory(newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build())).
-                    waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()));
+        private TestShardManager(Builder builder) {
+            super(builder);
         }
 
         @Override
@@ -1338,21 +1354,24 @@ public class ShardManagerTest extends AbstractActorTest {
             assertEquals("Recovery complete", true,
                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
         }
-    }
 
-    @SuppressWarnings("serial")
-    static class TestShardManagerCreator implements Creator<TestShardManager> {
-        String shardMrgIDSuffix;
-
-        TestShardManagerCreator(String shardMrgIDSuffix) {
-            this.shardMrgIDSuffix = shardMrgIDSuffix;
+        public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
+            return new Builder(datastoreContextBuilder);
         }
 
-        @Override
-        public TestShardManager create() throws Exception {
-            return new TestShardManager(shardMrgIDSuffix);
-        }
+        private static class Builder extends ShardManager.Builder {
+            Builder(DatastoreContext.Builder datastoreContextBuilder) {
+                cluster(new MockClusterWrapper()).configuration(new MockConfiguration());
+                datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
+                waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
+            }
 
+            @Override
+            public Props props() {
+                verify();
+                return Props.create(TestShardManager.class, this);
+            }
+        }
     }
 
     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
@@ -1377,6 +1396,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private final ActorRef shardActor;
         private final String name;
+        private final CountDownLatch snapshotPersist = new CountDownLatch(1);
+        private ShardManagerSnapshot snapshot;
 
         public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
             super(builder);
@@ -1455,6 +1476,18 @@ public class ShardManagerTest extends AbstractActorTest {
                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
             findPrimaryMessageReceived = new CountDownLatch(1);
         }
+
+        @Override
+        public void saveSnapshot(Object obj) {
+            snapshot = (ShardManagerSnapshot) obj;
+            snapshotPersist.countDown();
+        }
+
+        void verifySnapshotPersisted(Set<String> shardList) {
+            assertEquals("saveSnapshot invoked", true,
+                Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
+            assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
+        }
     }
 
     private static class MockRespondActor extends MessageCollectorActor {
index bcfbeb4..a2f01c3 100644 (file)
@@ -17,17 +17,17 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Set;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 
 public class ConfigurationImplTest {
 
-    private static ConfigurationImpl configuration;
+    private ConfigurationImpl configuration;
 
-    @BeforeClass
-    public static void staticSetup(){
+    @Before
+    public void setup(){
         configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
     }
 
@@ -151,4 +151,38 @@ public class ConfigurationImplTest {
         assertEquals("getUniqueMemberNamesForAllShards", Sets.newHashSet("member-1", "member-2", "member-3"),
                 configuration.getUniqueMemberNamesForAllShards());
     }
+
+    @Test
+    public void testAddMemberReplicaForShard() {
+        configuration.addMemberReplicaForShard("people-1", "member-2");
+        String shardName = configuration.getShardNameForModule("people");
+        assertEquals("ModuleShardName", "people-1", shardName);
+        ShardStrategy shardStrategy = configuration.getStrategyForModule("people");
+        assertEquals("ModuleStrategy", ModuleShardStrategy.class, shardStrategy.getClass());
+        Collection<String> members = configuration.getMembersFromShardName("people-1");
+        assertEquals("Members", ImmutableSortedSet.of("member-1", "member-2"),
+            ImmutableSortedSet.copyOf(members));
+
+        configuration.addMemberReplicaForShard("non-existent", "member-2");
+        Set<String> shardNames = configuration.getAllShardNames();
+        assertEquals("ShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"),
+            ImmutableSortedSet.copyOf(shardNames));
+    }
+
+    @Test
+    public void testRemoveMemberReplicaForShard() {
+        configuration.removeMemberReplicaForShard("default", "member-2");
+        String shardName = configuration.getShardNameForModule("default");
+        assertEquals("ModuleShardName", "default", shardName);
+        ShardStrategy shardStrategy = configuration.getStrategyForModule("default");
+        assertNull("ModuleStrategy", shardStrategy);
+        Collection<String> members = configuration.getMembersFromShardName("default");
+        assertEquals("Members", ImmutableSortedSet.of("member-1", "member-3"),
+            ImmutableSortedSet.copyOf(members));
+
+        configuration.removeMemberReplicaForShard("non-existent", "member-2");
+        Set<String> shardNames = configuration.getAllShardNames();
+        assertEquals("ShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"),
+            ImmutableSortedSet.copyOf(shardNames));
+    }
 }
index caae615..0d036d3 100644 (file)
@@ -13,6 +13,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -93,4 +94,30 @@ public class MockConfiguration implements Configuration{
     public boolean isShardConfigured(String shardName) {
         return (shardMembers.containsKey(shardName));
     }
+
+    @Override
+    public void addMemberReplicaForShard (String shardName, String newMemberName) {
+        Map<String, List<String>> newShardMembers = new HashMap<>(shardMembers);
+        for(Map.Entry<String, List<String>> shard : newShardMembers.entrySet()) {
+            if (shard.getKey().equals(shardName)) {
+                List<String> replicas = new ArrayList<>(shard.getValue());
+                replicas.add(newMemberName);
+                shard.setValue(replicas);
+                shardMembers = ImmutableMap.<String, List<String>>builder().putAll(newShardMembers).build();
+            }
+        }
+    }
+
+    @Override
+    public void removeMemberReplicaForShard (String shardName, String newMemberName) {
+        Map<String, List<String>> newShardMembers = new HashMap<>(shardMembers);
+        for(Map.Entry<String, List<String>> shard : newShardMembers.entrySet()) {
+            if (shard.getKey().equals(shardName)) {
+                List<String> replicas = new ArrayList<>(shard.getValue());
+                replicas.remove(newMemberName);
+                shard.setValue(replicas);
+                shardMembers = ImmutableMap.<String, List<String>>builder().putAll(newShardMembers).build();
+            }
+        }
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.