X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManagerTest.java;h=bd452a73e697ede549f68f38fbed4b5f276284a9;hp=1ac21daaff3bcdf5c5686ede4f1570b84d26ccc0;hb=4e3f49788c05730b29468deebc2aaa4ed0d94eef;hpb=6a7cc4dcc9909a286dad1267e633af6313bd9059 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index 1ac21daaff..bd452a73e6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -27,6 +27,7 @@ import akka.actor.Status.Failure; import akka.actor.Status.Success; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; +import akka.cluster.Member; import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.pattern.Patterns; @@ -57,6 +58,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; @@ -64,6 +66,7 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; @@ -132,6 +135,9 @@ import scala.concurrent.duration.FiniteDuration; public class ShardManagerTest extends AbstractActorTest { private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class); + private static final MemberName MEMBER_1 = MemberName.forName("member-1"); + private static final MemberName MEMBER_2 = MemberName.forName("member-2"); + private static final MemberName MEMBER_3 = MemberName.forName("member-3"); private static int ID_COUNTER = 1; @@ -161,7 +167,7 @@ public class ShardManagerTest extends AbstractActorTest { InMemorySnapshotStore.clear(); if(mockShardActor == null) { - mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config"); + mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, MEMBER_1, "config"); mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName.toString()); } @@ -188,7 +194,7 @@ public class ShardManagerTest extends AbstractActorTest { } private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { - String name = new ShardIdentifier(shardName, memberName,"config").toString(); + String name = new ShardIdentifier(shardName, MemberName.forName(memberName), "config").toString(); if(system == getSystem()) { return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name); } @@ -289,13 +295,13 @@ public class ShardManagerTest extends AbstractActorTest { final MockConfiguration mockConfig = new MockConfiguration() { @Override - public Collection getMemberShardNames(String memberName) { + public Collection getMemberShardNames(MemberName memberName) { return Arrays.asList("default", "topology"); } @Override - public Collection getMembersFromShardName(String shardName) { - return Arrays.asList("member-1"); + public Collection getMembersFromShardName(String shardName) { + return members("member-1"); } }; @@ -686,7 +692,6 @@ public class ShardManagerTest extends AbstractActorTest { RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); shardManager1.underlyingActor().waitForMemberUp(); - shardManager1.tell(new FindPrimary("astronauts", false), getRef()); RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); @@ -773,7 +778,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.underlyingActor().waitForUnreachableMember(); PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); - assertEquals("getMemberName", "member-2", peerDown.getMemberName()); + assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName()); MessageCollectorActor.clearMessages(mockShardActor1); shardManager1.tell(MockClusterWrapper. @@ -791,7 +796,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.underlyingActor().waitForReachableMember(); PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); - assertEquals("getMemberName", "member-2", peerUp.getMemberName()); + assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName()); MessageCollectorActor.clearMessages(mockShardActor1); shardManager1.tell(new FindPrimary("default", true), getRef()); @@ -1126,7 +1131,7 @@ public class ShardManagerTest extends AbstractActorTest { LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting"); TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() { @Override - public List getMemberShardNames(String memberName) { + public List getMemberShardNames(MemberName memberName) { return Arrays.asList("default", "astronauts"); } })); @@ -1183,6 +1188,10 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + private static List members(String... names) { + return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList()); + } + @Test public void testOnCreateShard() { LOG.info("testOnCreateShard starting"); @@ -1200,7 +1209,7 @@ public class ShardManagerTest extends AbstractActorTest { Shard.Builder shardBuilder = Shard.builder(); ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", - "foo", null, Arrays.asList("member-1", "member-5", "member-6")); + "foo", null, members("member-1", "member-5", "member-6")); shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef()); expectMsgClass(duration("5 seconds"), Success.class); @@ -1212,10 +1221,11 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent()); assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig(). getPeerAddressResolver() instanceof ShardPeerAddressResolver); - assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(), - new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()), - shardBuilder.getPeerAddresses().keySet()); - assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix), + assertEquals("peerMembers", Sets.newHashSet( + new ShardIdentifier("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(), + new ShardIdentifier("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()), + shardBuilder.getPeerAddresses().keySet()); + assertEquals("ShardIdentifier", new ShardIdentifier("foo", MEMBER_1, shardMrgIDSuffix), shardBuilder.getId()); assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); @@ -1243,7 +1253,7 @@ public class ShardManagerTest extends AbstractActorTest { Shard.Builder shardBuilder = Shard.builder(); ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", - "foo", null, Arrays.asList("member-5", "member-6")); + "foo", null, members("member-5", "member-6")); shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); expectMsgClass(duration("5 seconds"), Success.class); @@ -1269,7 +1279,7 @@ public class ShardManagerTest extends AbstractActorTest { Shard.Builder shardBuilder = Shard.builder(); ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", - "foo", null, Arrays.asList("member-1")); + "foo", null, members("member-1")); shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); expectMsgClass(duration("5 seconds"), Success.class); @@ -1317,12 +1327,7 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot()); - Function shardNameTransformer = new Function() { - @Override - public String apply(ShardSnapshot s) { - return s.getName(); - } - }; + Function shardNameTransformer = s -> s.getName(); assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet( Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); @@ -1437,7 +1442,7 @@ public class ShardManagerTest extends AbstractActorTest { final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - String name = new ShardIdentifier("astronauts", "member-2", "config").toString(); + String name = new ShardIdentifier("astronauts", MEMBER_2, "config").toString(); final TestActorRef mockShardLeaderActor = TestActorRef.create(system2, Props.create(MockRespondActor.class). withDispatcher(Dispatchers.DefaultDispatcherId()), name); @@ -1654,7 +1659,7 @@ public class ShardManagerTest extends AbstractActorTest { ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); - shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef()); + shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef()); Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class); assertEquals("Failure obtained", true, (resp.cause() instanceof PrimaryNotFoundException)); @@ -1682,9 +1687,9 @@ public class ShardManagerTest extends AbstractActorTest { RaftState.Leader.name())), respondActor); respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null)); - shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef()); + shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef()); final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class); - assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(), + assertEquals(new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(), removeServer.getServerId()); expectMsgClass(duration("5 seconds"), Success.class); }}; @@ -1713,7 +1718,7 @@ public class ShardManagerTest extends AbstractActorTest { final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString(); + String name = new ShardIdentifier("default", MEMBER_2, shardMrgIDSuffix).toString(); final TestActorRef mockShardLeaderActor = TestActorRef.create(system2, Props.create(MockRespondActor.class), name); @@ -1768,10 +1773,10 @@ public class ShardManagerTest extends AbstractActorTest { //construct a mock response message RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2); mockShardLeaderActor.underlyingActor().updateResponse(response); - newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef()); + newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef()); RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, RemoveServer.class); - String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(); + String removeServerId = new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(); assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId()); expectMsgClass(duration("5 seconds"), Status.Success.class); }}; @@ -1780,14 +1785,14 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception { - testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"), - RemoveServer.class, new RemoveShardReplica("astronauts", "member-3")); + testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2), + RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3)); } @Test public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception { testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"), - AddServer.class, new RemoveShardReplica("astronauts", "member-2")); + AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2)); } @@ -1842,7 +1847,7 @@ public class ShardManagerTest extends AbstractActorTest { // Removed the default shard replica from member-1 ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); - ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build(); + ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix).build(); shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); @@ -1861,7 +1866,7 @@ public class ShardManagerTest extends AbstractActorTest { put("astronauts", Arrays.asList("member-2")). put("people", Arrays.asList("member-1", "member-2")).build()); - String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1"). + String shardId = ShardIdentifier.builder().shardName("default").memberName(MEMBER_1). type(shardMrgIDSuffix).build().toString(); TestActorRef shard = actorFactory.createTestActor( MessageCollectorActor.props(), shardId); @@ -1934,12 +1939,12 @@ public class ShardManagerTest extends AbstractActorTest { put("shard1", Arrays.asList("member-1")). put("shard2", Arrays.asList("member-1")).build()); - String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1"). + String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName(MEMBER_1). type(shardMrgIDSuffix).build().toString(); TestActorRef shard1 = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1); - String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1"). + String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName(MEMBER_1). type(shardMrgIDSuffix).build().toString(); TestActorRef shard2 = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2); @@ -2005,6 +2010,12 @@ public class ShardManagerTest extends AbstractActorTest { } } + private void countDownIfOther(final Member member, CountDownLatch latch) { + if (!getCluster().getCurrentMemberName().equals(memberToName(member))) { + latch.countDown(); + } + } + @Override public void handleCommand(Object message) throws Exception { try{ @@ -2017,25 +2028,13 @@ public class ShardManagerTest extends AbstractActorTest { if(message instanceof FindPrimary) { findPrimaryMessageReceived.countDown(); } else if(message instanceof ClusterEvent.MemberUp) { - String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next(); - if(!getCluster().getCurrentMemberName().equals(role)) { - memberUpReceived.countDown(); - } + countDownIfOther(((ClusterEvent.MemberUp)message).member(), memberUpReceived); } else if(message instanceof ClusterEvent.MemberRemoved) { - String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next(); - if(!getCluster().getCurrentMemberName().equals(role)) { - memberRemovedReceived.countDown(); - } + countDownIfOther(((ClusterEvent.MemberRemoved)message).member(), memberRemovedReceived); } else if(message instanceof ClusterEvent.UnreachableMember) { - String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next(); - if(!getCluster().getCurrentMemberName().equals(role)) { - memberUnreachableReceived.countDown(); - } + countDownIfOther(((ClusterEvent.UnreachableMember)message).member(), memberUnreachableReceived); } else if(message instanceof ClusterEvent.ReachableMember) { - String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next(); - if(!getCluster().getCurrentMemberName().equals(role)) { - memberReachableReceived.countDown(); - } + countDownIfOther(((ClusterEvent.ReachableMember)message).member(), memberReachableReceived); } } }