BUG 2187 - Persisting shard list in ShardManager 70/29370/7
authorkalaiselvik <Kalaiselvi_K@Dell.com>
Sat, 14 Nov 2015 19:00:42 +0000 (00:30 +0530)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 16 Nov 2015 05:57:33 +0000 (00:57 -0500)
In ShardManager, the local shard list is persisted as a snapshot.
On recovery, persisted shard list is used to create the shards.
During recovery, obtained persisted information is updated to the
configuration so that it is uniformly available to the DatastoreContext.

Incorporated the comments

Also, as localShards are now created after RecoveryCompletion, the
shardManager mbean is associated with the shardManager immediately
after creation. On creating the localShards, the shards addition
is notified to the mbean object.
In the shardManagerTests involving verification of the syncStatus
and CountDownLatch objects, the testcases are made to wait for
localShard creation by waiting for recoveryCompletion message.

Change-Id: I523ed9b14af4b1b6e272f05faac1cf37abfef336
Signed-off-by: kalaiselvik <Kalaiselvi_K@Dell.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ModuleConfig.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImplTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java

index 49e1fe32f381ffd338f5b4ba8da035576eaca9fa..98a6090514c9549f2f506c82a85fce7376e35cf6 100644 (file)
@@ -21,6 +21,9 @@ import akka.cluster.ClusterEvent;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
 import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
@@ -34,6 +37,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -116,7 +120,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfo mBean;
+    private final ShardManagerInfo mBean;
 
     private DatastoreContextFactory datastoreContextFactory;
 
 
     private DatastoreContextFactory datastoreContextFactory;
 
@@ -149,7 +153,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        createLocalShards();
+        List<String> localShardActorNames = new ArrayList<>();
+        mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+                "shard-manager-" + this.type,
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
+                localShardActorNames);
+        mBean.setShardManager(this);
     }
 
     @Override
     }
 
     @Override
@@ -199,6 +208,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
+        } else if (message instanceof SaveSnapshotSuccess) {
+            LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+        } else if (message instanceof SaveSnapshotFailure) {
+            LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
+                persistenceId(), ((SaveSnapshotFailure)message).cause());
         } else {
             unknownMessage(message);
         }
         } else {
             unknownMessage(message);
         }
@@ -428,6 +442,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
             // journal on upgrade from Helium.
             deleteMessages(lastSequenceNr());
             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
             // journal on upgrade from Helium.
             deleteMessages(lastSequenceNr());
+            createLocalShards();
+        } else if (message instanceof SnapshotOffer) {
+            handleShardRecovery((SnapshotOffer) message);
         }
     }
 
         }
     }
 
@@ -726,20 +743,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         restoreFromSnapshot = null; // null out to GC
 
 
         restoreFromSnapshot = null; // null out to GC
 
-        List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
                         shardSnapshots.get(shardName)), peerAddressResolver));
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
                         shardSnapshots.get(shardName)), peerAddressResolver));
+            mBean.addLocalShard(shardId.toString());
         }
         }
-
-        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
-
-        mBean.setShardManager(this);
     }
 
     /**
     }
 
     /**
@@ -870,6 +881,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
                           getPeerAddresses(shardName), datastoreContext,
                           Shard.builder(), peerAddressResolver);
         final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
                           getPeerAddresses(shardName), datastoreContext,
                           Shard.builder(), peerAddressResolver);
+        shardInfo.setShardActiveMember(false);
         localShards.put(shardName, shardInfo);
         shardInfo.setActor(newShardActor(schemaContext, shardInfo));
 
         localShards.put(shardName, shardInfo);
         shardInfo.setActor(newShardActor(schemaContext, shardInfo));
 
@@ -916,6 +928,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+            shardInfo.setShardActiveMember(true);
+            persistShardList();
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
             sender.tell(new akka.actor.Status.Success(true), getSelf());
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
             sender.tell(new akka.actor.Status.Success(true), getSelf());
@@ -961,6 +975,39 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return;
     }
 
         return;
     }
 
+    private void persistShardList() {
+        List<String> shardList = new ArrayList(localShards.keySet());
+        for (ShardInformation shardInfo : localShards.values()) {
+            if (!shardInfo.isShardActiveMember()) {
+                shardList.remove(shardInfo.getShardName());
+            }
+        }
+        LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+        saveSnapshot(new ShardManagerSnapshot(shardList));
+    }
+
+    private void handleShardRecovery(SnapshotOffer offer) {
+        LOG.debug ("{}: in handleShardRecovery", persistenceId());
+        ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+        String currentMember = cluster.getCurrentMemberName();
+        Set<String> configuredShardList =
+            new HashSet<>(configuration.getMemberShardNames(currentMember));
+        for (String shard : snapshot.getShardList()) {
+            if (!configuredShardList.contains(shard)) {
+                // add the current member as a replica for the shard
+                LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+                configuration.addMemberReplicaForShard(shard, currentMember);
+            } else {
+                configuredShardList.remove(shard);
+            }
+        }
+        for (String shard : configuredShardList) {
+            // remove the member as a replica for the shard
+            LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+            configuration.removeMemberReplicaForShard(shard, currentMember);
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
@@ -984,6 +1031,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private DatastoreContext datastoreContext;
         private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
         private DatastoreContext datastoreContext;
         private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
+        private boolean shardActiveStatus = true;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
@@ -1182,6 +1230,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
+
+        void setShardActiveMember(boolean flag) {
+            shardActiveStatus = flag;
+        }
+
+        boolean isShardActiveMember() {
+            return shardActiveStatus;
+        }
     }
 
     private static class OnShardInitialized {
     }
 
     private static class OnShardInitialized {
@@ -1307,13 +1363,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return this;
         }
 
             return this;
         }
 
-        public Props props() {
+        protected void verify() {
             sealed = true;
             Preconditions.checkNotNull(cluster, "cluster should not be null");
             Preconditions.checkNotNull(configuration, "configuration should not be null");
             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
             sealed = true;
             Preconditions.checkNotNull(cluster, "cluster should not be null");
             Preconditions.checkNotNull(configuration, "configuration should not be null");
             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+        }
+
+        public Props props() {
+            verify();
             return Props.create(ShardManager.class, this);
         }
     }
             return Props.create(ShardManager.class, this);
         }
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java
new file mode 100644 (file)
index 0000000..4cc54dd
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Dell Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nonnull;
+
+/**
+ * Persisted data of the ShardManager
+ */
+
+public class ShardManagerSnapshot implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final List<String> shardList;
+
+    public ShardManagerSnapshot(@Nonnull List<String> shardList) {
+        this.shardList = new ArrayList<>(Preconditions.checkNotNull(shardList));
+    }
+
+    public List<String> getShardList() {
+        return this.shardList;
+    }
+
+    @Override
+    public String toString() {
+        return "ShardManagerSnapshot [ShardList = " + shardList + " ]";
+    }
+}
index dea77f5e34ed0cc46306bd108f5fcc6ed0580c11..b6122b3d29f4f07eada831feaa5765c3a073047f 100644 (file)
@@ -60,4 +60,14 @@ public interface Configuration {
      * Verifies if the given module shard in available in the cluster
      */
     boolean isShardConfigured(String shardName);
      * Verifies if the given module shard in available in the cluster
      */
     boolean isShardConfigured(String shardName);
+
+    /**
+     * Adds the given member as the new replica for the given shardName
+     */
+    void addMemberReplicaForShard (String shardName, String memberName);
+
+    /**
+     * Removes the given member as a replica for the given shardName
+     */
+    void removeMemberReplicaForShard (String shardName, String memberName);
 }
 }
index d553c66dbe69d072b93617b38937d8e213395e61..b7bcc59f2ffd9e6c4f0a3eff0a8dc0afbf69e164 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -157,4 +158,47 @@ public class ConfigurationImpl implements Configuration {
         Preconditions.checkNotNull(shardName, "shardName should not be null");
         return allShardNames.contains(shardName);
     }
         Preconditions.checkNotNull(shardName, "shardName should not be null");
         return allShardNames.contains(shardName);
     }
+
+    @Override
+    public void addMemberReplicaForShard (String shardName, String newMemberName) {
+        Preconditions.checkNotNull(shardName, "shardName should not be null");
+        Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
+
+        for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
+            ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
+            if(shardConfig != null) {
+                ModuleConfig newModuleConfig = new ModuleConfig(moduleConfig);
+                Set<String> replica = new HashSet<>(shardConfig.getReplicas());
+                replica.add(newMemberName);
+                newModuleConfig.addShardConfig(shardName, ImmutableSet.copyOf(replica));
+                updateModuleConfigMap(newModuleConfig);
+                return;
+            }
+        }
+    }
+
+    @Override
+    public void removeMemberReplicaForShard (String shardName, String newMemberName) {
+        Preconditions.checkNotNull(shardName, "shardName should not be null");
+        Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
+
+        for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
+            ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
+            if(shardConfig != null) {
+                ModuleConfig newModuleConfig = new ModuleConfig(moduleConfig);
+                Set<String> replica = new HashSet<>(shardConfig.getReplicas());
+                replica.remove(newMemberName);
+                newModuleConfig.addShardConfig(shardName, ImmutableSet.copyOf(replica));
+                updateModuleConfigMap(newModuleConfig);
+                return;
+            }
+        }
+    }
+
+    private void updateModuleConfigMap(ModuleConfig moduleConfig) {
+        HashMap<String, ModuleConfig> newModuleConfigMap = new HashMap<>(moduleConfigMap);
+        newModuleConfigMap.put(moduleConfig.getName(), moduleConfig);
+        moduleConfigMap = ImmutableMap.<String, ModuleConfig>builder().putAll(newModuleConfigMap).build();
+        return;
+    }
 }
 }
index 1a309a06c0bfadc56f8104817b56daa1e832b844..b54946774d2a4bc7cd266bf93370a91f5ccf0d77 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.config;
 
  */
 package org.opendaylight.controller.cluster.datastore.config;
 
+import com.google.common.collect.ImmutableSet;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,6 +29,16 @@ public class ModuleConfig {
         this.name = name;
     }
 
         this.name = name;
     }
 
+    public ModuleConfig(ModuleConfig moduleConfig) {
+        this.name = moduleConfig.getName();
+        this.nameSpace = moduleConfig.getNameSpace();
+        this.shardStrategy = moduleConfig.getShardStrategy();
+        for (ShardConfig shardConfig : moduleConfig.getShardConfigs()) {
+            shardConfigs.put(shardConfig.getName(), new ShardConfig(shardConfig.getName(),
+                ImmutableSet.copyOf(shardConfig.getReplicas())));
+        }
+    }
+
     public String getName() {
         return name;
     }
     public String getName() {
         return name;
     }
@@ -63,4 +74,8 @@ public class ModuleConfig {
     public void setShardStrategy(ShardStrategy shardStrategy) {
         this.shardStrategy = shardStrategy;
     }
     public void setShardStrategy(ShardStrategy shardStrategy) {
         this.shardStrategy = shardStrategy;
     }
-}
\ No newline at end of file
+
+    public ShardConfig removeShardConfig(String name) {
+        return shardConfigs.remove(name);
+    }
+}
index 29cb189fdaf252dd1d533e5a0020eb71784d0a0c..b6ea14ff95e75653d6c6d2ae4ce559261a75674f 100644 (file)
@@ -100,6 +100,7 @@ import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -137,6 +138,7 @@ public class ShardManagerTest extends AbstractActorTest {
         MockitoAnnotations.initMocks(this);
 
         InMemoryJournal.clear();
         MockitoAnnotations.initMocks(this);
 
         InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
 
         if(mockShardActor == null) {
             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
 
         if(mockShardActor == null) {
             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
@@ -149,6 +151,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @After
     public void tearDown() {
         InMemoryJournal.clear();
     @After
     public void tearDown() {
         InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
     private Props newShardMgrProps() {
     }
 
     private Props newShardMgrProps() {
@@ -163,9 +166,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps(Configuration config) {
     }
 
     private Props newShardMgrProps(Configuration config) {
-        return ShardManager.builder().cluster(new MockClusterWrapper()).configuration(config).
-                datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
-                waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache).props();
+        return TestShardManager.builder(datastoreContextBuilder).configuration(config).props();
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
@@ -188,6 +189,17 @@ public class ShardManagerTest extends AbstractActorTest {
         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
+    private TestShardManager newTestShardManager() {
+        return newTestShardManager(newShardMgrProps());
+    }
+
+    private TestShardManager newTestShardManager(Props props) {
+        TestActorRef<TestShardManager> shardManagerActor = TestActorRef.create(getSystem(), props);
+        TestShardManager shardManager = shardManagerActor.underlyingActor();
+        shardManager.waitForRecoveryComplete();
+        return shardManager;
+    }
+
     @Test
     public void testPerShardDatastoreContext() throws Exception {
         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
     @Test
     public void testPerShardDatastoreContext() throws Exception {
         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
@@ -846,8 +858,7 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
 
         new JavaTestKit(getSystem()) {{
         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
 
         new JavaTestKit(getSystem()) {{
-            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
-                    Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
             shardManager.underlyingActor().waitForRecoveryComplete();
             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
 
             shardManager.underlyingActor().waitForRecoveryComplete();
             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
@@ -862,196 +873,168 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
 
     @Test
     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
 
-                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
-                        memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
+        String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+        shardManager.onReceiveCommand(new RoleChangeNotification(
+                memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
 
 
-                verify(ready, never()).countDown();
+        verify(ready, never()).countDown();
 
 
-                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
-                        Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
+        shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+                Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
 
 
-                verify(ready, times(1)).countDown();
-
-            }};
+        verify(ready, times(1)).countDown();
     }
 
     @Test
     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
     }
 
     @Test
     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
-
-                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
-                        memberId, null, RaftState.Follower.name()));
+        new JavaTestKit(getSystem()) {{
+            TestShardManager shardManager = newTestShardManager();
 
 
-                verify(ready, never()).countDown();
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.onReceiveCommand(new RoleChangeNotification(
+                    memberId, null, RaftState.Follower.name()));
 
 
-                shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+            verify(ready, never()).countDown();
 
 
-                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
-                        "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
-                        DataStoreVersions.CURRENT_VERSION));
+            shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
 
 
-                verify(ready, times(1)).countDown();
+            shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
+                    "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
+                    DataStoreVersions.CURRENT_VERSION));
 
 
-            }};
+            verify(ready, times(1)).countDown();
+        }};
     }
 
     @Test
     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
     }
 
     @Test
     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
-
-                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
-                        memberId, null, RaftState.Follower.name()));
+        new JavaTestKit(getSystem()) {{
+            TestShardManager shardManager = newTestShardManager();
 
 
-                verify(ready, never()).countDown();
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
 
 
-                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
-                        "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
-                        DataStoreVersions.CURRENT_VERSION));
+            verify(ready, never()).countDown();
 
 
-                shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+            shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
+                    "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
+                    DataStoreVersions.CURRENT_VERSION));
 
 
-                verify(ready, times(1)).countDown();
+            shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
 
 
-            }};
+            verify(ready, times(1)).countDown();
+        }};
     }
 
     @Test
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
     }
 
     @Test
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
 
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
-                        "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+        shardManager.onReceiveCommand(new RoleChangeNotification(
+                "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
 
 
-                verify(ready, never()).countDown();
-
-            }};
+        verify(ready, never()).countDown();
     }
 
     }
 
-
     @Test
     public void testByDefaultSyncStatusIsFalse() throws Exception{
     @Test
     public void testByDefaultSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = newShardMgrProps();
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
-
-        ShardManager shardManagerActor = shardManager.underlyingActor();
+        TestShardManager shardManager = newTestShardManager();
 
 
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
     }
 
     @Test
     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
     }
 
     @Test
     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
-        final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
 
-        ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+        shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
-        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(true, shardManager.getMBean().getSyncStatus());
     }
 
     @Test
     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
     }
 
     @Test
     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
-        final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
 
-        ShardManager shardManagerActor = shardManager.underlyingActor();
         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
                 RaftState.Follower.name(), RaftState.Candidate.name()));
 
                 RaftState.Follower.name(), RaftState.Candidate.name()));
 
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
 
         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(
+        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
                 true, shardId));
 
                 true, shardId));
 
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
     }
 
     @Test
     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
     }
 
     @Test
     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
-        final Props persistentProps = newShardMgrProps();
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
+        TestShardManager shardManager = newTestShardManager();
 
         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
 
         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
                 RaftState.Candidate.name(), RaftState.Follower.name()));
 
         // Initially will be false
                 RaftState.Candidate.name(), RaftState.Follower.name()));
 
         // Initially will be false
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Send status true will make sync status true
 
         // Send status true will make sync status true
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
+        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
 
 
-        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(true, shardManager.getMBean().getSyncStatus());
 
         // Send status false will make sync status false
 
         // Send status false will make sync status false
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
+        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
 
 
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
     }
 
     @Test
     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
 
     }
 
     @Test
     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
-        final Props persistentProps = newShardMgrProps(new MockConfiguration() {
+        TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
             @Override
             public List<String> getMemberShardNames(String memberName) {
                 return Arrays.asList("default", "astronauts");
             }
             @Override
             public List<String> getMemberShardNames(String memberName) {
                 return Arrays.asList("default", "astronauts");
             }
-        });
-
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
-
-        ShardManager shardManagerActor = shardManager.underlyingActor();
+        }));
 
         // Initially will be false
 
         // Initially will be false
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Make default shard leader
         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
 
         // Make default shard leader
         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(defaultShardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // default = Leader, astronauts is unknown so sync status remains false
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // default = Leader, astronauts is unknown so sync status remains false
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Make astronauts shard leader as well
         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
 
         // Make astronauts shard leader as well
         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // Now sync status should be true
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // Now sync status should be true
-        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(true, shardManager.getMBean().getSyncStatus());
 
         // Make astronauts a Follower
 
         // Make astronauts a Follower
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+        shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Leader.name(), RaftState.Follower.name()));
 
         // Sync status is not true
                 RaftState.Leader.name(), RaftState.Follower.name()));
 
         // Sync status is not true
-        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(false, shardManager.getMBean().getSyncStatus());
 
         // Make the astronauts follower sync status true
 
         // Make the astronauts follower sync status true
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
+        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
 
         // Sync status is now true
 
         // Sync status is now true
-        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+        assertEquals(true, shardManager.getMBean().getSyncStatus());
 
     }
 
 
     }
 
@@ -1260,7 +1243,8 @@ public class ShardManagerTest extends AbstractActorTest {
                 AddServer.class);
             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
                 AddServer.class);
             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
-
+            newReplicaShardManager.underlyingActor()
+                .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
             expectMsgClass(duration("5 seconds"), Status.Success.class);
         }};
 
             expectMsgClass(duration("5 seconds"), Status.Success.class);
         }};
 
@@ -1314,13 +1298,45 @@ public class ShardManagerTest extends AbstractActorTest {
 
     }
 
 
     }
 
+    @Test
+    public void testShardPersistenceWithRestoredData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                   put("default", Arrays.asList("member-1", "member-2")).
+                   put("astronauts", Arrays.asList("member-2")).
+                   put("people", Arrays.asList("member-1", "member-2")).build());
+            String[] restoredShards = {"default", "astronauts"};
+            ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+            InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot);
+
+            //create shardManager to come up with restored data
+            TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
+                    newShardMgrProps(mockConfig));
+
+            newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
+
+            newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
+            LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+            assertEquals("for uninitialized shard", "people", notFound.getShardName());
+
+            //Verify a local shard is created for the restored shards,
+            //although we expect a NotInitializedException for the shards as the actor initialization
+            //message is not sent for them
+            newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+        }};
+    }
+
+
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
-        TestShardManager(String shardMrgIDSuffix) {
-            super(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
-                    datastoreContextFactory(newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build())).
-                    waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()));
+        private TestShardManager(Builder builder) {
+            super(builder);
         }
 
         @Override
         }
 
         @Override
@@ -1338,21 +1354,24 @@ public class ShardManagerTest extends AbstractActorTest {
             assertEquals("Recovery complete", true,
                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
         }
             assertEquals("Recovery complete", true,
                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
         }
-    }
 
 
-    @SuppressWarnings("serial")
-    static class TestShardManagerCreator implements Creator<TestShardManager> {
-        String shardMrgIDSuffix;
-
-        TestShardManagerCreator(String shardMrgIDSuffix) {
-            this.shardMrgIDSuffix = shardMrgIDSuffix;
+        public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
+            return new Builder(datastoreContextBuilder);
         }
 
         }
 
-        @Override
-        public TestShardManager create() throws Exception {
-            return new TestShardManager(shardMrgIDSuffix);
-        }
+        private static class Builder extends ShardManager.Builder {
+            Builder(DatastoreContext.Builder datastoreContextBuilder) {
+                cluster(new MockClusterWrapper()).configuration(new MockConfiguration());
+                datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
+                waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
+            }
 
 
+            @Override
+            public Props props() {
+                verify();
+                return Props.create(TestShardManager.class, this);
+            }
+        }
     }
 
     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
     }
 
     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
@@ -1377,6 +1396,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private final ActorRef shardActor;
         private final String name;
         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private final ActorRef shardActor;
         private final String name;
+        private final CountDownLatch snapshotPersist = new CountDownLatch(1);
+        private ShardManagerSnapshot snapshot;
 
         public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
             super(builder);
 
         public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
             super(builder);
@@ -1455,6 +1476,18 @@ public class ShardManagerTest extends AbstractActorTest {
                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
             findPrimaryMessageReceived = new CountDownLatch(1);
         }
                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
             findPrimaryMessageReceived = new CountDownLatch(1);
         }
+
+        @Override
+        public void saveSnapshot(Object obj) {
+            snapshot = (ShardManagerSnapshot) obj;
+            snapshotPersist.countDown();
+        }
+
+        void verifySnapshotPersisted(Set<String> shardList) {
+            assertEquals("saveSnapshot invoked", true,
+                Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
+            assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
+        }
     }
 
     private static class MockRespondActor extends MessageCollectorActor {
     }
 
     private static class MockRespondActor extends MessageCollectorActor {
index bcfbeb4b584c84ba266c0d84c7dc729c1b0a5ccb..a2f01c377e9a48f3dd4a19092f0d241aefede959 100644 (file)
@@ -17,17 +17,17 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Set;
 import org.junit.Assert;
 import java.util.Collection;
 import java.util.Set;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 
 public class ConfigurationImplTest {
 
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 
 public class ConfigurationImplTest {
 
-    private static ConfigurationImpl configuration;
+    private ConfigurationImpl configuration;
 
 
-    @BeforeClass
-    public static void staticSetup(){
+    @Before
+    public void setup(){
         configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
     }
 
         configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
     }
 
@@ -151,4 +151,38 @@ public class ConfigurationImplTest {
         assertEquals("getUniqueMemberNamesForAllShards", Sets.newHashSet("member-1", "member-2", "member-3"),
                 configuration.getUniqueMemberNamesForAllShards());
     }
         assertEquals("getUniqueMemberNamesForAllShards", Sets.newHashSet("member-1", "member-2", "member-3"),
                 configuration.getUniqueMemberNamesForAllShards());
     }
+
+    @Test
+    public void testAddMemberReplicaForShard() {
+        configuration.addMemberReplicaForShard("people-1", "member-2");
+        String shardName = configuration.getShardNameForModule("people");
+        assertEquals("ModuleShardName", "people-1", shardName);
+        ShardStrategy shardStrategy = configuration.getStrategyForModule("people");
+        assertEquals("ModuleStrategy", ModuleShardStrategy.class, shardStrategy.getClass());
+        Collection<String> members = configuration.getMembersFromShardName("people-1");
+        assertEquals("Members", ImmutableSortedSet.of("member-1", "member-2"),
+            ImmutableSortedSet.copyOf(members));
+
+        configuration.addMemberReplicaForShard("non-existent", "member-2");
+        Set<String> shardNames = configuration.getAllShardNames();
+        assertEquals("ShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"),
+            ImmutableSortedSet.copyOf(shardNames));
+    }
+
+    @Test
+    public void testRemoveMemberReplicaForShard() {
+        configuration.removeMemberReplicaForShard("default", "member-2");
+        String shardName = configuration.getShardNameForModule("default");
+        assertEquals("ModuleShardName", "default", shardName);
+        ShardStrategy shardStrategy = configuration.getStrategyForModule("default");
+        assertNull("ModuleStrategy", shardStrategy);
+        Collection<String> members = configuration.getMembersFromShardName("default");
+        assertEquals("Members", ImmutableSortedSet.of("member-1", "member-3"),
+            ImmutableSortedSet.copyOf(members));
+
+        configuration.removeMemberReplicaForShard("non-existent", "member-2");
+        Set<String> shardNames = configuration.getAllShardNames();
+        assertEquals("ShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"),
+            ImmutableSortedSet.copyOf(shardNames));
+    }
 }
 }
index caae615ae6b9aa7a3ef6f374623eb240c4714d51..0d036d36992c776748d7af786ef89ac0119e55ab 100644 (file)
@@ -13,6 +13,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -93,4 +94,30 @@ public class MockConfiguration implements Configuration{
     public boolean isShardConfigured(String shardName) {
         return (shardMembers.containsKey(shardName));
     }
     public boolean isShardConfigured(String shardName) {
         return (shardMembers.containsKey(shardName));
     }
+
+    @Override
+    public void addMemberReplicaForShard (String shardName, String newMemberName) {
+        Map<String, List<String>> newShardMembers = new HashMap<>(shardMembers);
+        for(Map.Entry<String, List<String>> shard : newShardMembers.entrySet()) {
+            if (shard.getKey().equals(shardName)) {
+                List<String> replicas = new ArrayList<>(shard.getValue());
+                replicas.add(newMemberName);
+                shard.setValue(replicas);
+                shardMembers = ImmutableMap.<String, List<String>>builder().putAll(newShardMembers).build();
+            }
+        }
+    }
+
+    @Override
+    public void removeMemberReplicaForShard (String shardName, String newMemberName) {
+        Map<String, List<String>> newShardMembers = new HashMap<>(shardMembers);
+        for(Map.Entry<String, List<String>> shard : newShardMembers.entrySet()) {
+            if (shard.getKey().equals(shardName)) {
+                List<String> replicas = new ArrayList<>(shard.getValue());
+                replicas.remove(newMemberName);
+                shard.setValue(replicas);
+                shardMembers = ImmutableMap.<String, List<String>>builder().putAll(newShardMembers).build();
+            }
+        }
+    }
 }
 }