X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=b6ea14ff95e75653d6c6d2ae4ce559261a75674f;hb=refs%2Fchanges%2F70%2F29370%2F7;hp=6f3946d42219ed963b0ed03da2448a675165cf12;hpb=c2d1b9207fe82d36db83501e1baaffe7bc7da9ae;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 6f3946d422..b6ea14ff95 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -23,6 +23,7 @@ import akka.actor.AddressFromURIString; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; +import akka.actor.Status.Failure; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.dispatch.Dispatchers; @@ -44,9 +45,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -68,6 +71,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -90,10 +95,12 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -131,6 +138,7 @@ public class ShardManagerTest extends AbstractActorTest { MockitoAnnotations.initMocks(this); InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); if(mockShardActor == null) { mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); @@ -143,6 +151,7 @@ public class ShardManagerTest extends AbstractActorTest { @After public void tearDown() { InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } private Props newShardMgrProps() { @@ -157,8 +166,7 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps(Configuration config) { - return ShardManager.props(new MockClusterWrapper(), config, - newDatastoreContextFactory(datastoreContextBuilder.build()), ready, primaryShardInfoCache); + return TestShardManager.builder(datastoreContextBuilder).configuration(config).props(); } private Props newPropsShardMgrWithMockShardActor() { @@ -172,14 +180,26 @@ public class ShardManagerTest extends AbstractActorTest { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ForwardingShardManager(clusterWrapper, config, newDatastoreContextFactory( - datastoreContextBuilder.build()), ready, name, shardActor, primaryShardInfoCache); + return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config). + datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())). + waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor); } }; return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } + private TestShardManager newTestShardManager() { + return newTestShardManager(newShardMgrProps()); + } + + private TestShardManager newTestShardManager(Props props) { + TestActorRef shardManagerActor = TestActorRef.create(getSystem(), props); + TestShardManager shardManager = shardManagerActor.underlyingActor(); + shardManager.waitForRecoveryComplete(); + return shardManager; + } + @Test public void testPerShardDatastoreContext() throws Exception { final DatastoreContextFactory mockFactory = newDatastoreContextFactory( @@ -218,7 +238,9 @@ public class ShardManagerTest extends AbstractActorTest { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), mockConfig, mockFactory, ready, primaryShardInfoCache) { + return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig). + datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready). + primaryShardInfoCache(primaryShardInfoCache)) { @Override protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { Entry entry = shardInfoMap.get(info.getShardName()); @@ -836,8 +858,7 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), - Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); shardManager.underlyingActor().waitForRecoveryComplete(); InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); @@ -852,201 +873,168 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + TestShardManager shardManager = newTestShardManager(); - verify(ready, never()).countDown(); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, - Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); + verify(ready, never()).countDown(); - verify(ready, times(1)).countDown(); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); - }}; + verify(ready, times(1)).countDown(); } @Test public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, null, RaftState.Follower.name())); + new JavaTestKit(getSystem()) {{ + TestShardManager shardManager = newTestShardManager(); - verify(ready, never()).countDown(); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); - shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), - DataStoreVersions.CURRENT_VERSION)); + shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); - verify(ready, times(1)).countDown(); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); - }}; + verify(ready, times(1)).countDown(); + }}; } @Test public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, null, RaftState.Follower.name())); + new JavaTestKit(getSystem()) {{ + TestShardManager shardManager = newTestShardManager(); - verify(ready, never()).countDown(); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), - DataStoreVersions.CURRENT_VERSION)); + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); - verify(ready, times(1)).countDown(); + shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); - }}; + verify(ready, times(1)).countDown(); + }}; } @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.onReceiveCommand(new RoleChangeNotification( + "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); - verify(ready, never()).countDown(); - - }}; + verify(ready, never()).countDown(); } - @Test public void testByDefaultSyncStatusIsFalse() throws Exception{ - final Props persistentProps = newShardMgrProps(); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestShardManager shardManager = newTestShardManager(); - ShardManager shardManagerActor = shardManager.underlyingActor(); - - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{ - final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, - primaryShardInfoCache); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); - - ShardManager shardManagerActor = shardManager.underlyingActor(); - shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + TestShardManager shardManager = newTestShardManager(); + + shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, RaftState.Follower.name(), RaftState.Leader.name())); - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ - final Props persistentProps = newShardMgrProps(); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestShardManager shardManager = newTestShardManager(); - ShardManager shardManagerActor = shardManager.underlyingActor(); - shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + String shardId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(shardId, RaftState.Follower.name(), RaftState.Candidate.name())); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown")); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus( + true, shardId)); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{ - final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, - primaryShardInfoCache); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); - - ShardManager shardManagerActor = shardManager.underlyingActor(); - shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + TestShardManager shardManager = newTestShardManager(); + + String shardId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(shardId, RaftState.Candidate.name(), RaftState.Follower.name())); // Initially will be false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Send status true will make sync status true - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown")); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId)); - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); // Send status false will make sync status false - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown")); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId)); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ - final Props persistentProps = ShardManager.props(new MockClusterWrapper(), - new MockConfiguration() { - @Override - public List getMemberShardNames(String memberName) { - return Arrays.asList("default", "astronauts"); - } - }, - newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, - primaryShardInfoCache); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); - - ShardManager shardManagerActor = shardManager.underlyingActor(); + TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() { + @Override + public List getMemberShardNames(String memberName) { + return Arrays.asList("default", "astronauts"); + } + })); // Initially will be false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make default shard leader - shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId, RaftState.Follower.name(), RaftState.Leader.name())); // default = Leader, astronauts is unknown so sync status remains false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make astronauts shard leader as well - shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown", + String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId, RaftState.Follower.name(), RaftState.Leader.name())); // Now sync status should be true - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); // Make astronauts a Follower - shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown", + shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId, RaftState.Leader.name(), RaftState.Follower.name())); // Sync status is not true - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make the astronauts follower sync status true - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown")); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId)); // Sync status is now true - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); } @@ -1080,11 +1068,11 @@ public class ShardManagerTest extends AbstractActorTest { DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100). persistent(false).build(); - TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + Shard.Builder shardBuilder = Shard.builder(); ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", "foo", null, Arrays.asList("member-1", "member-5", "member-6")); - shardManager.tell(new CreateShard(config, shardPropsCreator, datastoreContext), getRef()); + shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef()); expectMsgClass(duration("5 seconds"), CreateShardReply.class); @@ -1092,19 +1080,19 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), LocalShardFound.class); - assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent()); - assertTrue("Epxected ShardPeerAddressResolver", shardPropsCreator.datastoreContext.getShardRaftConfig(). + assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent()); + assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig(). getPeerAddressResolver() instanceof ShardPeerAddressResolver); assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(), new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()), - shardPropsCreator.peerAddresses.keySet()); + shardBuilder.getPeerAddresses().keySet()); assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix), - shardPropsCreator.shardId); - assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext); + shardBuilder.getId()); + assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); // Send CreateShard with same name - should fail. - shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef()); + shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); }}; @@ -1116,11 +1104,11 @@ public class ShardManagerTest extends AbstractActorTest { ActorRef shardManager = getSystem().actorOf(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); - TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + Shard.Builder shardBuilder = Shard.builder(); ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", "foo", null, Arrays.asList("member-1")); - shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef()); + shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); expectMsgClass(duration("5 seconds"), CreateShardReply.class); @@ -1131,11 +1119,51 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), LocalShardFound.class); - assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext); - assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext); + assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); + assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext()); }}; } + @Test + public void testGetSnapshot() throws Throwable { + JavaTestKit kit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). + put("shard1", Arrays.asList("member-1")). + put("shard2", Arrays.asList("member-1")).build()); + + ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher( + Dispatchers.DefaultDispatcherId())); + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + Failure failure = kit.expectMsgClass(Failure.class); + assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass()); + + kit = new JavaTestKit(getSystem()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); + + shardManager.tell(new FindLocalShard("shard1", true), kit.getRef()); + kit.expectMsgClass(LocalShardFound.class); + shardManager.tell(new FindLocalShard("shard2", true), kit.getRef()); + kit.expectMsgClass(LocalShardFound.class); + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + + DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class); + + assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); + List shardSnapshots = datastoreSnapshot.getShardSnapshots(); + Set actualShardNames = new HashSet<>(); + for(ShardSnapshot s: shardSnapshots) { + actualShardNames.add(s.getName()); + } + + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames); + + shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + @Test public void testAddShardReplicaForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -1215,7 +1243,8 @@ public class ShardManagerTest extends AbstractActorTest { AddServer.class); String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); - + newReplicaShardManager.underlyingActor() + .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts")); expectMsgClass(duration("5 seconds"), Status.Success.class); }}; @@ -1269,31 +1298,45 @@ public class ShardManagerTest extends AbstractActorTest { } - private static class TestShardPropsCreator implements ShardPropsCreator { - ShardIdentifier shardId; - Map peerAddresses; - SchemaContext schemaContext; - DatastoreContext datastoreContext; + @Test + public void testShardPersistenceWithRestoredData() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")). + put("people", Arrays.asList("member-1", "member-2")).build()); + String[] restoredShards = {"default", "astronauts"}; + ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot); - @Override - public Props newProps(ShardIdentifier shardId, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { - this.shardId = shardId; - this.peerAddresses = peerAddresses; - this.schemaContext = schemaContext; - this.datastoreContext = datastoreContext; - return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext); - } + //create shardManager to come up with restored data + TestActorRef newRestoredShardManager = TestActorRef.create(getSystem(), + newShardMgrProps(mockConfig)); + + newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); + + newRestoredShardManager.tell(new FindLocalShard("people", false), getRef()); + LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + assertEquals("for uninitialized shard", "people", notFound.getShardName()); + + //Verify a local shard is created for the restored shards, + //although we expect a NotInitializedException for the shards as the actor initialization + //message is not sent for them + newRestoredShardManager.tell(new FindLocalShard("default", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + }}; } + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); - TestShardManager(String shardMrgIDSuffix) { - super(new MockClusterWrapper(), new MockConfiguration(), - newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()), - ready, new PrimaryShardInfoFutureCache()); + private TestShardManager(Builder builder) { + super(builder); } @Override @@ -1311,21 +1354,24 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } - } - @SuppressWarnings("serial") - static class TestShardManagerCreator implements Creator { - String shardMrgIDSuffix; - - TestShardManagerCreator(String shardMrgIDSuffix) { - this.shardMrgIDSuffix = shardMrgIDSuffix; + public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) { + return new Builder(datastoreContextBuilder); } - @Override - public TestShardManager create() throws Exception { - return new TestShardManager(shardMrgIDSuffix); - } + private static class Builder extends ShardManager.Builder { + Builder(DatastoreContext.Builder datastoreContextBuilder) { + cluster(new MockClusterWrapper()).configuration(new MockConfiguration()); + datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())); + waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()); + } + @Override + public Props props() { + verify(); + return Props.create(TestShardManager.class, this); + } + } } private static class DelegatingShardManagerCreator implements Creator { @@ -1350,11 +1396,11 @@ public class ShardManagerTest extends AbstractActorTest { private CountDownLatch memberReachableReceived = new CountDownLatch(1); private final ActorRef shardActor; private final String name; + private final CountDownLatch snapshotPersist = new CountDownLatch(1); + private ShardManagerSnapshot snapshot; - protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContextFactory factory, CountDownLatch waitTillReadyCountdownLatch, String name, - ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) { - super(cluster, configuration, factory, waitTillReadyCountdownLatch, primaryShardInfoCache); + public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) { + super(builder); this.shardActor = shardActor; this.name = name; } @@ -1430,6 +1476,18 @@ public class ShardManagerTest extends AbstractActorTest { Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); findPrimaryMessageReceived = new CountDownLatch(1); } + + @Override + public void saveSnapshot(Object obj) { + snapshot = (ShardManagerSnapshot) obj; + snapshotPersist.countDown(); + } + + void verifySnapshotPersisted(Set shardList) { + assertEquals("saveSnapshot invoked", true, + Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); + assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); + } } private static class MockRespondActor extends MessageCollectorActor {