import akka.actor.Status.Success;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
public class ShardManagerTest extends AbstractActorTest {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+ private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+ private static final MemberName MEMBER_3 = MemberName.forName("member-3");
private static int ID_COUNTER = 1;
InMemorySnapshotStore.clear();
if(mockShardActor == null) {
- mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config");
+ mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, MEMBER_1, "config");
mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
mockShardName.toString());
}
}
private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
- String name = new ShardIdentifier(shardName, memberName,"config").toString();
+ String name = new ShardIdentifier(shardName, MemberName.forName(memberName), "config").toString();
if(system == getSystem()) {
return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
}
final MockConfiguration mockConfig = new MockConfiguration() {
@Override
- public Collection<String> getMemberShardNames(String memberName) {
+ public Collection<String> getMemberShardNames(MemberName memberName) {
return Arrays.asList("default", "topology");
}
@Override
- public Collection<String> getMembersFromShardName(String shardName) {
- return Arrays.asList("member-1");
+ public Collection<MemberName> getMembersFromShardName(String shardName) {
+ return members("member-1");
}
};
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
shardManager1.underlyingActor().waitForMemberUp();
-
shardManager1.tell(new FindPrimary("astronauts", false), getRef());
RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
shardManager1.underlyingActor().waitForUnreachableMember();
PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
- assertEquals("getMemberName", "member-2", peerDown.getMemberName());
+ assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(MockClusterWrapper.
shardManager1.underlyingActor().waitForReachableMember();
PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
- assertEquals("getMemberName", "member-2", peerUp.getMemberName());
+ assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(new FindPrimary("default", true), getRef());
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
@Override
- public List<String> getMemberShardNames(String memberName) {
+ public List<String> getMemberShardNames(MemberName memberName) {
return Arrays.asList("default", "astronauts");
}
}));
}};
}
+ private static List<MemberName> members(String... names) {
+ return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
+ }
+
@Test
public void testOnCreateShard() {
LOG.info("testOnCreateShard starting");
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
+ "foo", null, members("member-1", "member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
getPeerAddressResolver() instanceof ShardPeerAddressResolver);
- assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
- new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
- shardBuilder.getPeerAddresses().keySet());
- assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
+ assertEquals("peerMembers", Sets.newHashSet(
+ new ShardIdentifier("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
+ new ShardIdentifier("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
+ shardBuilder.getPeerAddresses().keySet());
+ assertEquals("ShardIdentifier", new ShardIdentifier("foo", MEMBER_1, shardMrgIDSuffix),
shardBuilder.getId());
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-5", "member-6"));
+ "foo", null, members("member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-1"));
+ "foo", null, members("member-1"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
- Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
- @Override
- public String apply(ShardSnapshot s) {
- return s.getName();
- }
- };
+ Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
+ String name = new ShardIdentifier("astronauts", MEMBER_2, "config").toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
TestActorRef.create(system2, Props.create(MockRespondActor.class).
withDispatcher(Dispatchers.DefaultDispatcherId()), name);
ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
- shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef());
+ shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
assertEquals("Failure obtained", true,
(resp.cause() instanceof PrimaryNotFoundException));
RaftState.Leader.name())), respondActor);
respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
- shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef());
+ shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
- assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(),
+ assertEquals(new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(),
removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Success.class);
}};
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString();
+ String name = new ShardIdentifier("default", MEMBER_2, shardMrgIDSuffix).toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
//construct a mock response message
RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
mockShardLeaderActor.underlyingActor().updateResponse(response);
- newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef());
+ newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
RemoveServer.class);
- String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString();
+ String removeServerId = new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString();
assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Status.Success.class);
}};
@Test
public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
- testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"),
- RemoveServer.class, new RemoveShardReplica("astronauts", "member-3"));
+ testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
+ RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
}
@Test
public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
- AddServer.class, new RemoveShardReplica("astronauts", "member-2"));
+ AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
}
// Removed the default shard replica from member-1
ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
- ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
+ ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix).build();
shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
put("astronauts", Arrays.asList("member-2")).
put("people", Arrays.asList("member-1", "member-2")).build());
- String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
+ String shardId = ShardIdentifier.builder().shardName("default").memberName(MEMBER_1).
type(shardMrgIDSuffix).build().toString();
TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
MessageCollectorActor.props(), shardId);
put("shard1", Arrays.asList("member-1")).
put("shard2", Arrays.asList("member-1")).build());
- String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
+ String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName(MEMBER_1).
type(shardMrgIDSuffix).build().toString();
TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
- String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
+ String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName(MEMBER_1).
type(shardMrgIDSuffix).build().toString();
TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
}
}
+ private void countDownIfOther(final Member member, CountDownLatch latch) {
+ if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
+ latch.countDown();
+ }
+ }
+
@Override
public void handleCommand(Object message) throws Exception {
try{
if(message instanceof FindPrimary) {
findPrimaryMessageReceived.countDown();
} else if(message instanceof ClusterEvent.MemberUp) {
- String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberUpReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.MemberUp)message).member(), memberUpReceived);
} else if(message instanceof ClusterEvent.MemberRemoved) {
- String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberRemovedReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.MemberRemoved)message).member(), memberRemovedReceived);
} else if(message instanceof ClusterEvent.UnreachableMember) {
- String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberUnreachableReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.UnreachableMember)message).member(), memberUnreachableReceived);
} else if(message instanceof ClusterEvent.ReachableMember) {
- String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberReachableReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.ReachableMember)message).member(), memberReachableReceived);
}
}
}