X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=b676cf225c801039d1e679298e6d7cb23d2808ac;hp=ae7a4f96c53fec04dbadbb612c9bc0369952f654;hb=8eaba1eb027b02f8b36480721055dc99c6700e85;hpb=3104f91c7d1b3ee5914d8778f87315f4ac64036d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index ae7a4f96c5..b676cf225c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -9,16 +9,23 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AddressFromURIString; import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.Arrays; import java.util.Collection; @@ -35,15 +42,16 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; @@ -75,6 +83,11 @@ public class ShardManagerTest extends AbstractActorTest { private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); + private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + String name = new ShardIdentifier(shardName, memberName,"config").toString(); + return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); + } + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -100,21 +113,22 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newPropsShardMgrWithMockShardActor() { + return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(), + new MockConfiguration()); + } + + private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor, + final ClusterWrapper clusterWrapper, final Configuration config) { Creator creator = new Creator() { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), - datastoreContextBuilder.build(), ready) { - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - return mockShardActor; - } - }; + return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), + ready, name, shardActor); } }; - return Props.create(new DelegatingShardManagerCreator(creator)); + return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } @Test @@ -124,9 +138,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary("non-existent", false), getRef()); - expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable()); + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; } @@ -146,9 +160,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name())), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; @@ -170,9 +184,9 @@ public class ShardManagerTest extends AbstractActorTest { RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); }}; @@ -183,9 +197,9 @@ public class ShardManagerTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; } @@ -197,7 +211,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; @@ -215,15 +229,15 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; @@ -238,7 +252,7 @@ public class ShardManagerTest extends AbstractActorTest { // We're passing waitUntilInitialized = true to FindPrimary so the response should be // delayed until we send ActorInitialized and RoleChangeNotification. - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); @@ -254,7 +268,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); @@ -269,9 +283,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); - expectMsgClass(duration("2 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("2 seconds"), NotInitializedException.class); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -289,7 +303,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, RaftState.Candidate.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; @@ -303,12 +317,78 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } + @Test + public void testOnReceiveFindPrimaryForRemoteShard() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2"); + + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + + shardManager2.underlyingActor().verifyFindPrimary(); + + Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForMemberRemoved(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -348,7 +428,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; } @@ -371,42 +451,6 @@ public class ShardManagerTest extends AbstractActorTest { }}; } - @Test - public void testOnReceiveMemberUp() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"), - PrimaryFound.SERIALIZABLE_CLASS)); - String path = found.getPrimaryPath(); - assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config")); - }}; - } - - @Test - public void testOnReceiveMemberDown() throws Exception { - - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); - - MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); - }}; - } - @Test public void testOnRecoveryJournalIsCleaned() { InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( @@ -595,7 +639,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationReleaseReady() throws Exception { + public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); @@ -604,11 +648,35 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId)); + verify(ready, times(1)).countDown(); }}; } + @Test + public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); + + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix)); + + verify(ready, times(1)).countDown(); + + }}; + } + + @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { @@ -804,4 +872,69 @@ public class ShardManagerTest extends AbstractActorTest { return delegate.create(); } } + + private static class ForwardingShardManager extends ShardManager { + private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); + private CountDownLatch memberUpReceived = new CountDownLatch(1); + private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private final ActorRef shardActor; + private final String name; + + protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, + ActorRef shardActor) { + super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + this.shardActor = shardActor; + this.name = name; + } + + @Override + public void handleCommand(Object message) throws Exception { + try{ + super.handleCommand(message); + } finally { + if(message instanceof FindPrimary) { + findPrimaryMessageReceived.countDown(); + } else if(message instanceof ClusterEvent.MemberUp) { + String role = ((ClusterEvent.MemberUp)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUpReceived.countDown(); + } + } else if(message instanceof ClusterEvent.MemberRemoved) { + String role = ((ClusterEvent.MemberRemoved)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberRemovedReceived.countDown(); + } + } + } + } + + @Override + public String persistenceId() { + return name; + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return shardActor; + } + + void waitForMemberUp() { + assertEquals("MemberUp received", true, + Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); + memberUpReceived = new CountDownLatch(1); + } + + void waitForMemberRemoved() { + assertEquals("MemberRemoved received", true, + Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS)); + memberRemovedReceived = new CountDownLatch(1); + } + + void verifyFindPrimary() { + assertEquals("FindPrimary received", true, + Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); + findPrimaryMessageReceived = new CountDownLatch(1); + } + } }