From: kalaiselvik Date: Sat, 14 Nov 2015 19:00:42 +0000 (+0530) Subject: BUG 2187 - Persisting shard list in ShardManager X-Git-Tag: release/beryllium~159 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F70%2F29370%2F7 BUG 2187 - Persisting shard list in ShardManager In ShardManager, the local shard list is persisted as a snapshot. On recovery, persisted shard list is used to create the shards. During recovery, obtained persisted information is updated to the configuration so that it is uniformly available to the DatastoreContext. Incorporated the comments Also, as localShards are now created after RecoveryCompletion, the shardManager mbean is associated with the shardManager immediately after creation. On creating the localShards, the shards addition is notified to the mbean object. In the shardManagerTests involving verification of the syncStatus and CountDownLatch objects, the testcases are made to wait for localShard creation by waiting for recoveryCompletion message. Change-Id: I523ed9b14af4b1b6e272f05faac1cf37abfef336 Signed-off-by: kalaiselvik --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 49e1fe32f3..98a6090514 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -21,6 +21,9 @@ import akka.cluster.ClusterEvent; import akka.dispatch.OnComplete; import akka.japi.Function; import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; @@ -34,6 +37,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -116,7 +120,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String shardDispatcherPath; - private ShardManagerInfo mBean; + private final ShardManagerInfo mBean; private DatastoreContextFactory datastoreContextFactory; @@ -149,7 +153,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - createLocalShards(); + List localShardActorNames = new ArrayList<>(); + mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(), + "shard-manager-" + this.type, + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), + localShardActorNames); + mBean.setShardManager(this); } @Override @@ -199,6 +208,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onRemoveShardReplica((RemoveShardReplica)message); } else if(message instanceof GetSnapshot) { onGetSnapshot(); + } else if (message instanceof SaveSnapshotSuccess) { + LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId()); + } else if (message instanceof SaveSnapshotFailure) { + LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards", + persistenceId(), ((SaveSnapshotFailure)message).cause()); } else { unknownMessage(message); } @@ -428,6 +442,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // We no longer persist SchemaContext modules so delete all the prior messages from the akka // journal on upgrade from Helium. deleteMessages(lastSequenceNr()); + createLocalShards(); + } else if (message instanceof SnapshotOffer) { + handleShardRecovery((SnapshotOffer) message); } } @@ -726,20 +743,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { restoreFromSnapshot = null; // null out to GC - List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); - localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( shardSnapshots.get(shardName)), peerAddressResolver)); + mBean.addLocalShard(shardId.toString()); } - - mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, - datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames); - - mBean.setShardManager(this); } /** @@ -870,6 +881,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ShardInformation shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, Shard.builder(), peerAddressResolver); + shardInfo.setShardActiveMember(false); localShards.put(shardName, shardInfo); shardInfo.setActor(newShardActor(schemaContext, shardInfo)); @@ -916,6 +928,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Make the local shard voting capable shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf()); + shardInfo.setShardActiveMember(true); + persistShardList(); mBean.addLocalShard(shardInfo.getShardId().toString()); sender.tell(new akka.actor.Status.Success(true), getSelf()); @@ -961,6 +975,39 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } + private void persistShardList() { + List shardList = new ArrayList(localShards.keySet()); + for (ShardInformation shardInfo : localShards.values()) { + if (!shardInfo.isShardActiveMember()) { + shardList.remove(shardInfo.getShardName()); + } + } + LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList); + saveSnapshot(new ShardManagerSnapshot(shardList)); + } + + private void handleShardRecovery(SnapshotOffer offer) { + LOG.debug ("{}: in handleShardRecovery", persistenceId()); + ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot(); + String currentMember = cluster.getCurrentMemberName(); + Set configuredShardList = + new HashSet<>(configuration.getMemberShardNames(currentMember)); + for (String shard : snapshot.getShardList()) { + if (!configuredShardList.contains(shard)) { + // add the current member as a replica for the shard + LOG.debug ("{}: adding shard {}", persistenceId(), shard); + configuration.addMemberReplicaForShard(shard, currentMember); + } else { + configuredShardList.remove(shard); + } + } + for (String shard : configuredShardList) { + // remove the member as a replica for the shard + LOG.debug ("{}: removing shard {}", persistenceId(), shard); + configuration.removeMemberReplicaForShard(shard, currentMember); + } + } + @VisibleForTesting protected static class ShardInformation { private final ShardIdentifier shardId; @@ -984,6 +1031,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContext datastoreContext; private Shard.AbstractBuilder builder; private final ShardPeerAddressResolver addressResolver; + private boolean shardActiveStatus = true; private ShardInformation(String shardName, ShardIdentifier shardId, Map initialPeerAddresses, DatastoreContext datastoreContext, @@ -1182,6 +1230,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderVersion(short leaderVersion) { this.leaderVersion = leaderVersion; } + + void setShardActiveMember(boolean flag) { + shardActiveStatus = flag; + } + + boolean isShardActiveMember() { + return shardActiveStatus; + } } private static class OnShardInitialized { @@ -1307,13 +1363,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return this; } - public Props props() { + protected void verify() { sealed = true; Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); + } + + public Props props() { + verify(); return Props.create(ShardManager.class, this); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java new file mode 100644 index 0000000000..4cc54dd8e3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2015 Dell Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nonnull; + +/** + * Persisted data of the ShardManager + */ + +public class ShardManagerSnapshot implements Serializable { + private static final long serialVersionUID = 1L; + private final List shardList; + + public ShardManagerSnapshot(@Nonnull List shardList) { + this.shardList = new ArrayList<>(Preconditions.checkNotNull(shardList)); + } + + public List getShardList() { + return this.shardList; + } + + @Override + public String toString() { + return "ShardManagerSnapshot [ShardList = " + shardList + " ]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java index dea77f5e34..b6122b3d29 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java @@ -60,4 +60,14 @@ public interface Configuration { * Verifies if the given module shard in available in the cluster */ boolean isShardConfigured(String shardName); + + /** + * Adds the given member as the new replica for the given shardName + */ + void addMemberReplicaForShard (String shardName, String memberName); + + /** + * Removes the given member as a replica for the given shardName + */ + void removeMemberReplicaForShard (String shardName, String memberName); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java index d553c66dbe..b7bcc59f2f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java @@ -14,6 +14,7 @@ import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -157,4 +158,47 @@ public class ConfigurationImpl implements Configuration { Preconditions.checkNotNull(shardName, "shardName should not be null"); return allShardNames.contains(shardName); } + + @Override + public void addMemberReplicaForShard (String shardName, String newMemberName) { + Preconditions.checkNotNull(shardName, "shardName should not be null"); + Preconditions.checkNotNull(newMemberName, "MemberName should not be null"); + + for(ModuleConfig moduleConfig: moduleConfigMap.values()) { + ShardConfig shardConfig = moduleConfig.getShardConfig(shardName); + if(shardConfig != null) { + ModuleConfig newModuleConfig = new ModuleConfig(moduleConfig); + Set replica = new HashSet<>(shardConfig.getReplicas()); + replica.add(newMemberName); + newModuleConfig.addShardConfig(shardName, ImmutableSet.copyOf(replica)); + updateModuleConfigMap(newModuleConfig); + return; + } + } + } + + @Override + public void removeMemberReplicaForShard (String shardName, String newMemberName) { + Preconditions.checkNotNull(shardName, "shardName should not be null"); + Preconditions.checkNotNull(newMemberName, "MemberName should not be null"); + + for(ModuleConfig moduleConfig: moduleConfigMap.values()) { + ShardConfig shardConfig = moduleConfig.getShardConfig(shardName); + if(shardConfig != null) { + ModuleConfig newModuleConfig = new ModuleConfig(moduleConfig); + Set replica = new HashSet<>(shardConfig.getReplicas()); + replica.remove(newMemberName); + newModuleConfig.addShardConfig(shardName, ImmutableSet.copyOf(replica)); + updateModuleConfigMap(newModuleConfig); + return; + } + } + } + + private void updateModuleConfigMap(ModuleConfig moduleConfig) { + HashMap newModuleConfigMap = new HashMap<>(moduleConfigMap); + newModuleConfigMap.put(moduleConfig.getName(), moduleConfig); + moduleConfigMap = ImmutableMap.builder().putAll(newModuleConfigMap).build(); + return; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ModuleConfig.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ModuleConfig.java index 1a309a06c0..b54946774d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ModuleConfig.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ModuleConfig.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore.config; +import com.google.common.collect.ImmutableSet; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -28,6 +29,16 @@ public class ModuleConfig { this.name = name; } + public ModuleConfig(ModuleConfig moduleConfig) { + this.name = moduleConfig.getName(); + this.nameSpace = moduleConfig.getNameSpace(); + this.shardStrategy = moduleConfig.getShardStrategy(); + for (ShardConfig shardConfig : moduleConfig.getShardConfigs()) { + shardConfigs.put(shardConfig.getName(), new ShardConfig(shardConfig.getName(), + ImmutableSet.copyOf(shardConfig.getReplicas()))); + } + } + public String getName() { return name; } @@ -63,4 +74,8 @@ public class ModuleConfig { public void setShardStrategy(ShardStrategy shardStrategy) { this.shardStrategy = shardStrategy; } -} \ No newline at end of file + + public ShardConfig removeShardConfig(String name) { + return shardConfigs.remove(name); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 29cb189fda..b6ea14ff95 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -100,6 +100,7 @@ import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -137,6 +138,7 @@ public class ShardManagerTest extends AbstractActorTest { MockitoAnnotations.initMocks(this); InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); if(mockShardActor == null) { mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); @@ -149,6 +151,7 @@ public class ShardManagerTest extends AbstractActorTest { @After public void tearDown() { InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } private Props newShardMgrProps() { @@ -163,9 +166,7 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps(Configuration config) { - return ShardManager.builder().cluster(new MockClusterWrapper()).configuration(config). - datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())). - waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache).props(); + return TestShardManager.builder(datastoreContextBuilder).configuration(config).props(); } private Props newPropsShardMgrWithMockShardActor() { @@ -188,6 +189,17 @@ public class ShardManagerTest extends AbstractActorTest { return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } + private TestShardManager newTestShardManager() { + return newTestShardManager(newShardMgrProps()); + } + + private TestShardManager newTestShardManager(Props props) { + TestActorRef shardManagerActor = TestActorRef.create(getSystem(), props); + TestShardManager shardManager = shardManagerActor.underlyingActor(); + shardManager.waitForRecoveryComplete(); + return shardManager; + } + @Test public void testPerShardDatastoreContext() throws Exception { final DatastoreContextFactory mockFactory = newDatastoreContextFactory( @@ -846,8 +858,7 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), - Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); shardManager.underlyingActor().waitForRecoveryComplete(); InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); @@ -862,196 +873,168 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); - verify(ready, never()).countDown(); + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, - Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); - verify(ready, times(1)).countDown(); - - }}; + verify(ready, times(1)).countDown(); } @Test public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, null, RaftState.Follower.name())); + new JavaTestKit(getSystem()) {{ + TestShardManager shardManager = newTestShardManager(); - verify(ready, never()).countDown(); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); - shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), - DataStoreVersions.CURRENT_VERSION)); + shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); - verify(ready, times(1)).countDown(); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); - }}; + verify(ready, times(1)).countDown(); + }}; } @Test public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, null, RaftState.Follower.name())); + new JavaTestKit(getSystem()) {{ + TestShardManager shardManager = newTestShardManager(); - verify(ready, never()).countDown(); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), - DataStoreVersions.CURRENT_VERSION)); + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); - verify(ready, times(1)).countDown(); + shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); - }}; + verify(ready, times(1)).countDown(); + }}; } @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.onReceiveCommand(new RoleChangeNotification( + "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); - verify(ready, never()).countDown(); - - }}; + verify(ready, never()).countDown(); } - @Test public void testByDefaultSyncStatusIsFalse() throws Exception{ - final Props persistentProps = newShardMgrProps(); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); - - ShardManager shardManagerActor = shardManager.underlyingActor(); + TestShardManager shardManager = newTestShardManager(); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{ - final TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - ShardManager shardManagerActor = shardManager.underlyingActor(); - shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, RaftState.Follower.name(), RaftState.Leader.name())); - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ - final TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - ShardManager shardManagerActor = shardManager.underlyingActor(); String shardId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId, + shardManager.onReceiveCommand(new RoleChangeNotification(shardId, RaftState.Follower.name(), RaftState.Candidate.name())); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus( + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus( true, shardId)); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{ - final Props persistentProps = newShardMgrProps(); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestShardManager shardManager = newTestShardManager(); String shardId = "member-1-shard-default-" + shardMrgIDSuffix; - ShardManager shardManagerActor = shardManager.underlyingActor(); - shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId, + shardManager.onReceiveCommand(new RoleChangeNotification(shardId, RaftState.Candidate.name(), RaftState.Follower.name())); // Initially will be false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Send status true will make sync status true - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId)); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId)); - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); // Send status false will make sync status false - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId)); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId)); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ - final Props persistentProps = newShardMgrProps(new MockConfiguration() { + TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() { @Override public List getMemberShardNames(String memberName) { return Arrays.asList("default", "astronauts"); } - }); - - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); - - ShardManager shardManagerActor = shardManager.underlyingActor(); + })); // Initially will be false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make default shard leader String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManagerActor.onReceiveCommand(new RoleChangeNotification(defaultShardId, + shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId, RaftState.Follower.name(), RaftState.Leader.name())); // default = Leader, astronauts is unknown so sync status remains false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make astronauts shard leader as well String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix; - shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId, + shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId, RaftState.Follower.name(), RaftState.Leader.name())); // Now sync status should be true - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); // Make astronauts a Follower - shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId, + shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId, RaftState.Leader.name(), RaftState.Follower.name())); // Sync status is not true - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make the astronauts follower sync status true - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId)); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId)); // Sync status is now true - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); } @@ -1260,7 +1243,8 @@ public class ShardManagerTest extends AbstractActorTest { AddServer.class); String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); - + newReplicaShardManager.underlyingActor() + .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts")); expectMsgClass(duration("5 seconds"), Status.Success.class); }}; @@ -1314,13 +1298,45 @@ public class ShardManagerTest extends AbstractActorTest { } + @Test + public void testShardPersistenceWithRestoredData() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")). + put("people", Arrays.asList("member-1", "member-2")).build()); + String[] restoredShards = {"default", "astronauts"}; + ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot); + + //create shardManager to come up with restored data + TestActorRef newRestoredShardManager = TestActorRef.create(getSystem(), + newShardMgrProps(mockConfig)); + + newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); + + newRestoredShardManager.tell(new FindLocalShard("people", false), getRef()); + LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + assertEquals("for uninitialized shard", "people", notFound.getShardName()); + + //Verify a local shard is created for the restored shards, + //although we expect a NotInitializedException for the shards as the actor initialization + //message is not sent for them + newRestoredShardManager.tell(new FindLocalShard("default", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + }}; + } + + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); - TestShardManager(String shardMrgIDSuffix) { - super(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(new MockConfiguration()). - datastoreContextFactory(newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build())). - waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache())); + private TestShardManager(Builder builder) { + super(builder); } @Override @@ -1338,21 +1354,24 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } - } - @SuppressWarnings("serial") - static class TestShardManagerCreator implements Creator { - String shardMrgIDSuffix; - - TestShardManagerCreator(String shardMrgIDSuffix) { - this.shardMrgIDSuffix = shardMrgIDSuffix; + public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) { + return new Builder(datastoreContextBuilder); } - @Override - public TestShardManager create() throws Exception { - return new TestShardManager(shardMrgIDSuffix); - } + private static class Builder extends ShardManager.Builder { + Builder(DatastoreContext.Builder datastoreContextBuilder) { + cluster(new MockClusterWrapper()).configuration(new MockConfiguration()); + datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())); + waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()); + } + @Override + public Props props() { + verify(); + return Props.create(TestShardManager.class, this); + } + } } private static class DelegatingShardManagerCreator implements Creator { @@ -1377,6 +1396,8 @@ public class ShardManagerTest extends AbstractActorTest { private CountDownLatch memberReachableReceived = new CountDownLatch(1); private final ActorRef shardActor; private final String name; + private final CountDownLatch snapshotPersist = new CountDownLatch(1); + private ShardManagerSnapshot snapshot; public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) { super(builder); @@ -1455,6 +1476,18 @@ public class ShardManagerTest extends AbstractActorTest { Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); findPrimaryMessageReceived = new CountDownLatch(1); } + + @Override + public void saveSnapshot(Object obj) { + snapshot = (ShardManagerSnapshot) obj; + snapshotPersist.countDown(); + } + + void verifySnapshotPersisted(Set shardList) { + assertEquals("saveSnapshot invoked", true, + Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); + assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); + } } private static class MockRespondActor extends MessageCollectorActor { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImplTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImplTest.java index bcfbeb4b58..a2f01c377e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImplTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImplTest.java @@ -17,17 +17,17 @@ import java.net.URI; import java.util.Collection; import java.util.Set; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; public class ConfigurationImplTest { - private static ConfigurationImpl configuration; + private ConfigurationImpl configuration; - @BeforeClass - public static void staticSetup(){ + @Before + public void setup(){ configuration = new ConfigurationImpl("module-shards.conf", "modules.conf"); } @@ -151,4 +151,38 @@ public class ConfigurationImplTest { assertEquals("getUniqueMemberNamesForAllShards", Sets.newHashSet("member-1", "member-2", "member-3"), configuration.getUniqueMemberNamesForAllShards()); } + + @Test + public void testAddMemberReplicaForShard() { + configuration.addMemberReplicaForShard("people-1", "member-2"); + String shardName = configuration.getShardNameForModule("people"); + assertEquals("ModuleShardName", "people-1", shardName); + ShardStrategy shardStrategy = configuration.getStrategyForModule("people"); + assertEquals("ModuleStrategy", ModuleShardStrategy.class, shardStrategy.getClass()); + Collection members = configuration.getMembersFromShardName("people-1"); + assertEquals("Members", ImmutableSortedSet.of("member-1", "member-2"), + ImmutableSortedSet.copyOf(members)); + + configuration.addMemberReplicaForShard("non-existent", "member-2"); + Set shardNames = configuration.getAllShardNames(); + assertEquals("ShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"), + ImmutableSortedSet.copyOf(shardNames)); + } + + @Test + public void testRemoveMemberReplicaForShard() { + configuration.removeMemberReplicaForShard("default", "member-2"); + String shardName = configuration.getShardNameForModule("default"); + assertEquals("ModuleShardName", "default", shardName); + ShardStrategy shardStrategy = configuration.getStrategyForModule("default"); + assertNull("ModuleStrategy", shardStrategy); + Collection members = configuration.getMembersFromShardName("default"); + assertEquals("Members", ImmutableSortedSet.of("member-1", "member-3"), + ImmutableSortedSet.copyOf(members)); + + configuration.removeMemberReplicaForShard("non-existent", "member-2"); + Set shardNames = configuration.getAllShardNames(); + assertEquals("ShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"), + ImmutableSortedSet.copyOf(shardNames)); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java index caae615ae6..0d036d3699 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java @@ -13,6 +13,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -93,4 +94,30 @@ public class MockConfiguration implements Configuration{ public boolean isShardConfigured(String shardName) { return (shardMembers.containsKey(shardName)); } + + @Override + public void addMemberReplicaForShard (String shardName, String newMemberName) { + Map> newShardMembers = new HashMap<>(shardMembers); + for(Map.Entry> shard : newShardMembers.entrySet()) { + if (shard.getKey().equals(shardName)) { + List replicas = new ArrayList<>(shard.getValue()); + replicas.add(newMemberName); + shard.setValue(replicas); + shardMembers = ImmutableMap.>builder().putAll(newShardMembers).build(); + } + } + } + + @Override + public void removeMemberReplicaForShard (String shardName, String newMemberName) { + Map> newShardMembers = new HashMap<>(shardMembers); + for(Map.Entry> shard : newShardMembers.entrySet()) { + if (shard.getKey().equals(shardName)) { + List replicas = new ArrayList<>(shard.getValue()); + replicas.remove(newMemberName); + shard.setValue(replicas); + shardMembers = ImmutableMap.>builder().putAll(newShardMembers).build(); + } + } + } }