X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=b6ea14ff95e75653d6c6d2ae4ce559261a75674f;hp=29cb189fdaf252dd1d533e5a0020eb71784d0a0c;hb=60dbe8adeda3af724255231af9400341b17953b9;hpb=469f369b8acba18877badab60f9aa68a4ff354e7 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 29cb189fda..b6ea14ff95 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -100,6 +100,7 @@ import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -137,6 +138,7 @@ public class ShardManagerTest extends AbstractActorTest { MockitoAnnotations.initMocks(this); InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); if(mockShardActor == null) { mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); @@ -149,6 +151,7 @@ public class ShardManagerTest extends AbstractActorTest { @After public void tearDown() { InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } private Props newShardMgrProps() { @@ -163,9 +166,7 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps(Configuration config) { - return ShardManager.builder().cluster(new MockClusterWrapper()).configuration(config). - datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())). - waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache).props(); + return TestShardManager.builder(datastoreContextBuilder).configuration(config).props(); } private Props newPropsShardMgrWithMockShardActor() { @@ -188,6 +189,17 @@ public class ShardManagerTest extends AbstractActorTest { return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } + private TestShardManager newTestShardManager() { + return newTestShardManager(newShardMgrProps()); + } + + private TestShardManager newTestShardManager(Props props) { + TestActorRef shardManagerActor = TestActorRef.create(getSystem(), props); + TestShardManager shardManager = shardManagerActor.underlyingActor(); + shardManager.waitForRecoveryComplete(); + return shardManager; + } + @Test public void testPerShardDatastoreContext() throws Exception { final DatastoreContextFactory mockFactory = newDatastoreContextFactory( @@ -846,8 +858,7 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), - Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); shardManager.underlyingActor().waitForRecoveryComplete(); InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); @@ -862,196 +873,168 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); - verify(ready, never()).countDown(); + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, - Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); - verify(ready, times(1)).countDown(); - - }}; + verify(ready, times(1)).countDown(); } @Test public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, null, RaftState.Follower.name())); + new JavaTestKit(getSystem()) {{ + TestShardManager shardManager = newTestShardManager(); - verify(ready, never()).countDown(); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); - shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), - DataStoreVersions.CURRENT_VERSION)); + shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); - verify(ready, times(1)).countDown(); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); - }}; + verify(ready, times(1)).countDown(); + }}; } @Test public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, null, RaftState.Follower.name())); + new JavaTestKit(getSystem()) {{ + TestShardManager shardManager = newTestShardManager(); - verify(ready, never()).countDown(); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); - shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), - DataStoreVersions.CURRENT_VERSION)); + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); - verify(ready, times(1)).countDown(); + shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); - }}; + verify(ready, times(1)).countDown(); + }}; } @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.onReceiveCommand(new RoleChangeNotification( + "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); - verify(ready, never()).countDown(); - - }}; + verify(ready, never()).countDown(); } - @Test public void testByDefaultSyncStatusIsFalse() throws Exception{ - final Props persistentProps = newShardMgrProps(); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); - - ShardManager shardManagerActor = shardManager.underlyingActor(); + TestShardManager shardManager = newTestShardManager(); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{ - final TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - ShardManager shardManagerActor = shardManager.underlyingActor(); - shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, RaftState.Follower.name(), RaftState.Leader.name())); - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ - final TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - ShardManager shardManagerActor = shardManager.underlyingActor(); String shardId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId, + shardManager.onReceiveCommand(new RoleChangeNotification(shardId, RaftState.Follower.name(), RaftState.Candidate.name())); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus( + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus( true, shardId)); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{ - final Props persistentProps = newShardMgrProps(); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestShardManager shardManager = newTestShardManager(); String shardId = "member-1-shard-default-" + shardMrgIDSuffix; - ShardManager shardManagerActor = shardManager.underlyingActor(); - shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId, + shardManager.onReceiveCommand(new RoleChangeNotification(shardId, RaftState.Candidate.name(), RaftState.Follower.name())); // Initially will be false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Send status true will make sync status true - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId)); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId)); - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); // Send status false will make sync status false - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId)); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId)); - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); } @Test public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ - final Props persistentProps = newShardMgrProps(new MockConfiguration() { + TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() { @Override public List getMemberShardNames(String memberName) { return Arrays.asList("default", "astronauts"); } - }); - - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); - - ShardManager shardManagerActor = shardManager.underlyingActor(); + })); // Initially will be false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make default shard leader String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManagerActor.onReceiveCommand(new RoleChangeNotification(defaultShardId, + shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId, RaftState.Follower.name(), RaftState.Leader.name())); // default = Leader, astronauts is unknown so sync status remains false - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make astronauts shard leader as well String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix; - shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId, + shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId, RaftState.Follower.name(), RaftState.Leader.name())); // Now sync status should be true - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); // Make astronauts a Follower - shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId, + shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId, RaftState.Leader.name(), RaftState.Follower.name())); // Sync status is not true - assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(false, shardManager.getMBean().getSyncStatus()); // Make the astronauts follower sync status true - shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId)); + shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId)); // Sync status is now true - assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + assertEquals(true, shardManager.getMBean().getSyncStatus()); } @@ -1260,7 +1243,8 @@ public class ShardManagerTest extends AbstractActorTest { AddServer.class); String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); - + newReplicaShardManager.underlyingActor() + .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts")); expectMsgClass(duration("5 seconds"), Status.Success.class); }}; @@ -1314,13 +1298,45 @@ public class ShardManagerTest extends AbstractActorTest { } + @Test + public void testShardPersistenceWithRestoredData() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")). + put("people", Arrays.asList("member-1", "member-2")).build()); + String[] restoredShards = {"default", "astronauts"}; + ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot); + + //create shardManager to come up with restored data + TestActorRef newRestoredShardManager = TestActorRef.create(getSystem(), + newShardMgrProps(mockConfig)); + + newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); + + newRestoredShardManager.tell(new FindLocalShard("people", false), getRef()); + LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + assertEquals("for uninitialized shard", "people", notFound.getShardName()); + + //Verify a local shard is created for the restored shards, + //although we expect a NotInitializedException for the shards as the actor initialization + //message is not sent for them + newRestoredShardManager.tell(new FindLocalShard("default", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + }}; + } + + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); - TestShardManager(String shardMrgIDSuffix) { - super(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(new MockConfiguration()). - datastoreContextFactory(newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build())). - waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache())); + private TestShardManager(Builder builder) { + super(builder); } @Override @@ -1338,21 +1354,24 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } - } - @SuppressWarnings("serial") - static class TestShardManagerCreator implements Creator { - String shardMrgIDSuffix; - - TestShardManagerCreator(String shardMrgIDSuffix) { - this.shardMrgIDSuffix = shardMrgIDSuffix; + public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) { + return new Builder(datastoreContextBuilder); } - @Override - public TestShardManager create() throws Exception { - return new TestShardManager(shardMrgIDSuffix); - } + private static class Builder extends ShardManager.Builder { + Builder(DatastoreContext.Builder datastoreContextBuilder) { + cluster(new MockClusterWrapper()).configuration(new MockConfiguration()); + datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())); + waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()); + } + @Override + public Props props() { + verify(); + return Props.create(TestShardManager.class, this); + } + } } private static class DelegatingShardManagerCreator implements Creator { @@ -1377,6 +1396,8 @@ public class ShardManagerTest extends AbstractActorTest { private CountDownLatch memberReachableReceived = new CountDownLatch(1); private final ActorRef shardActor; private final String name; + private final CountDownLatch snapshotPersist = new CountDownLatch(1); + private ShardManagerSnapshot snapshot; public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) { super(builder); @@ -1455,6 +1476,18 @@ public class ShardManagerTest extends AbstractActorTest { Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); findPrimaryMessageReceived = new CountDownLatch(1); } + + @Override + public void saveSnapshot(Object obj) { + snapshot = (ShardManagerSnapshot) obj; + snapshotPersist.countDown(); + } + + void verifySnapshotPersisted(Set shardList) { + assertEquals("saveSnapshot invoked", true, + Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); + assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); + } } private static class MockRespondActor extends MessageCollectorActor {