X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=ae7a4f96c53fec04dbadbb612c9bc0369952f654;hp=99417076bfa3087a38f57f5f6a7cc5b6f0676bb1;hb=468b9523001807db03b3a545328ac9bf819278c7;hpb=753515e8868a1a15982d3f2697439f522f273db5 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 99417076bf..ae7a4f96c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; @@ -44,9 +45,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; @@ -56,6 +59,7 @@ import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; public class ShardManagerTest extends AbstractActorTest { private static int ID_COUNTER = 1; @@ -66,7 +70,10 @@ public class ShardManagerTest extends AbstractActorTest { @Mock private static CountDownLatch ready; - private static ActorRef mockShardActor; + private static TestActorRef mockShardActor; + + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). + dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); @Before public void setUp() { @@ -75,9 +82,11 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); if(mockShardActor == null) { - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); - mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name); + String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name); } + + mockShardActor.underlyingActor().clear(); } @After @@ -86,44 +95,93 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps() { - DatastoreContext.Builder builder = DatastoreContext.newBuilder(); - builder.dataStoreType(shardMrgIDSuffix); return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - builder.build(), ready); + datastoreContextBuilder.build(), ready); + } + + private Props newPropsShardMgrWithMockShardActor() { + Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override + public ShardManager create() throws Exception { + return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), + datastoreContextBuilder.build(), ready) { + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return mockShardActor; + } + }; + } + }; + + return Props.create(new DelegatingShardManagerCreator(creator)); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef()); - expectMsgEquals(duration("5 seconds"), - new PrimaryNotFound("non-existent").toSerializable()); + expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable()); }}; } @Test - public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + + MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), mockShardActor); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; } @Test - public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception { + public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-2-shard-default")); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); @@ -132,28 +190,129 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception { + public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); // We're passing waitUntilInitialized = true to FindPrimary so the response should be - // delayed until we send ActorInitialized. - Future future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true), - new Timeout(5, TimeUnit.SECONDS)); + // delayed until we send ActorInitialized and RoleChangeNotification. + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); shardManager.tell(new ActorInitialized(), mockShardActor); - Object resp = Await.result(future, duration("5 seconds")); - assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound); + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor); + + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), ActorNotInitialized.class); + + shardManager.tell(new ActorInitialized(), mockShardActor); + + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + null, RaftState.Candidate.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -168,7 +327,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -185,7 +344,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); @@ -196,7 +355,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -439,14 +598,11 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); verify(ready, times(1)).countDown(); @@ -457,14 +613,10 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); verify(ready, never()).countDown();