import akka.actor.Status.Success;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
-import org.opendaylight.controller.cluster.datastore.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
-import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
public class ShardManagerTest extends AbstractActorTest {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+ private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+ private static final MemberName MEMBER_3 = MemberName.forName("member-3");
private static int ID_COUNTER = 1;
private static TestActorRef<MessageCollectorActor> mockShardActor;
- private static String mockShardName;
+ private static ShardIdentifier mockShardName;
private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
InMemorySnapshotStore.clear();
if(mockShardActor == null) {
- mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
- mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
+ mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
+ mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
+ mockShardName.toString());
}
mockShardActor.underlyingActor().clear();
}
private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
- String name = new ShardIdentifier(shardName, memberName,"config").toString();
+ String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if(system == getSystem()) {
return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
}
private Props newPropsShardMgrWithMockShardActor() {
- return newTestShardMgrBuilderWithMockShardActor().props();
+ return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
+ Dispatchers.DefaultDispatcherId());
}
private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
final MockConfiguration mockConfig = new MockConfiguration() {
@Override
- public Collection<String> getMemberShardNames(String memberName) {
+ public Collection<String> getMemberShardNames(MemberName memberName) {
return Arrays.asList("default", "topology");
}
@Override
- public Collection<String> getMembersFromShardName(String shardName) {
- return Arrays.asList("member-1");
+ public Collection<MemberName> getMembersFromShardName(String shardName) {
+ return members("member-1");
}
};
final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
final CountDownLatch newShardActorLatch = new CountDownLatch(2);
class LocalShardManager extends ShardManager {
- public LocalShardManager(AbstractBuilder<?> builder) {
- super(builder);
+ public LocalShardManager(AbstractShardManagerCreator<?> creator) {
+ super(creator);
}
@Override
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new LocalShardManager(new GenericBuilder<LocalShardManager>(LocalShardManager.class).
+ return new LocalShardManager(new GenericCreator<LocalShardManager>(LocalShardManager.class).
datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
configuration(mockConfig));
}
shardManager.tell(new ActorInitialized(), mockShardActor);
DataTree mockDataTree = mock(DataTree.class);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
DataStoreVersions.CURRENT_VERSION), getRef());
MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
shardManager.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
- shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
- leaderVersion), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
DataTree mockDataTree = mock(DataTree.class);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
DataStoreVersions.CURRENT_VERSION), mockShardActor);
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
DataTree mockDataTree = mock(DataTree.class);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
DataStoreVersions.CURRENT_VERSION), mockShardActor);
LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
+ mock(DataTree.class), leaderVersion), mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
shardManager1.underlyingActor().waitForMemberUp();
-
shardManager1.tell(new FindPrimary("astronauts", false), getRef());
RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
- Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
+ mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
shardManager1.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
- shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
+ shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION),
mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
shardManager1.underlyingActor().waitForUnreachableMember();
PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
- assertEquals("getMemberName", "member-2", peerDown.getMemberName());
+ assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(MockClusterWrapper.
shardManager1.underlyingActor().waitForReachableMember();
PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
- assertEquals("getMemberName", "member-2", peerUp.getMemberName());
+ assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(new FindPrimary("default", true), getRef());
String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
- Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
+ mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
shardManager1.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
- shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
+ shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION),
mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
- mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
+ mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
shardManager1.tell(MockClusterWrapper.
createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
- shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
+ shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION), mockShardActor1);
shardManager1.tell(new RoleChangeNotification(memberId1,
RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
- TestShardManager shardManager = newTestShardManager();
+ newTestShardManager();
InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
verify(ready, never()).countDown();
shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
- Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
+ mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
verify(ready, times(1)).countDown();
}
shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
- "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
+ "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION));
verify(ready, times(1)).countDown();
verify(ready, never()).countDown();
shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
- "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
+ "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION));
shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
@Override
- public List<String> getMemberShardNames(String memberName) {
+ public List<String> getMemberShardNames(MemberName memberName) {
return Arrays.asList("default", "astronauts");
}
}));
}};
}
+ private static List<MemberName> members(String... names) {
+ return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
+ }
+
@Test
public void testOnCreateShard() {
LOG.info("testOnCreateShard starting");
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
+ "foo", null, members("member-1", "member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
getPeerAddressResolver() instanceof ShardPeerAddressResolver);
- assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
- new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
- shardBuilder.getPeerAddresses().keySet());
- assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
+ assertEquals("peerMembers", Sets.newHashSet(
+ ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
+ ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
+ shardBuilder.getPeerAddresses().keySet());
+ assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
shardBuilder.getId());
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-5", "member-6"));
+ "foo", null, members("member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-1"));
+ "foo", null, members("member-1"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
- Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
- @Override
- public String apply(ShardSnapshot s) {
- return s.getName();
- }
- };
+ Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
+ String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+ String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class).
+ TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class,
+ new AddServerReply(ServerChangeStatus.OK, memberId2)).
withDispatcher(Dispatchers.DefaultDispatcherId()), name);
final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
- String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
+ mock(DataTree.class), leaderVersion), mockShardLeaderActor);
leaderShardManager.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
//construct a mock response message
- AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
AddServer.class);
String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
- Props.create(MockRespondActor.class, addServerReply), leaderId);
+ Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.tell(new RoleChangeNotification(newReplicaId,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
- shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
+ shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId,
DataStoreVersions.CURRENT_VERSION), mockShardActor);
shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION), getRef());
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), mockShardActor);
ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
- shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef());
+ shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
assertEquals("Failure obtained", true,
(resp.cause() instanceof PrimaryNotFoundException));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
final TestActorRef<MockRespondActor> respondActor =
- TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
+ new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), respondActor);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION), getRef());
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), respondActor);
- respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
- shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef());
+ shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
- assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(),
+ assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Success.class);
}};
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString();
+ String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+ TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
+ new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
- String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
+ mock(DataTree.class), leaderVersion), mockShardLeaderActor);
leaderShardManager.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor);
+ mock(DataTree.class), leaderVersion), mockShardActor);
newReplicaShardManager.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
leaderShardManager.underlyingActor().waitForMemberUp();
//construct a mock response message
- RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
- newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef());
+ newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
RemoveServer.class);
- String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString();
+ String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Status.Success.class);
}};
@Test
public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
- testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"),
- RemoveServer.class, new RemoveShardReplica("astronauts", "member-3"));
+ testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
+ RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
}
@Test
public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
- AddServer.class, new RemoveShardReplica("astronauts", "member-2"));
+ AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
}
// Removed the default shard replica from member-1
ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
- ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
+ ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix).build();
shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
put("astronauts", Arrays.asList("member-2")).
put("people", Arrays.asList("member-1", "member-2")).build());
- String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
- type(shardMrgIDSuffix).build().toString();
+ String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
MessageCollectorActor.props(), shardId);
put("shard1", Arrays.asList("member-1")).
put("shard2", Arrays.asList("member-1")).build());
- String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
- type(shardMrgIDSuffix).build().toString();
+ String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
- String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
- type(shardMrgIDSuffix).build().toString();
+ String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
LOG.info("testShutDown ending");
}
+ @Test
+ public void testChangeServersVotingStatus() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ TestActorRef<MockRespondActor> respondActor =
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
+ DataStoreVersions.CURRENT_VERSION), getRef());
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), respondActor);
+
+ shardManager.tell(new ChangeShardMembersVotingStatus("default",
+ ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+ ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching(
+ respondActor, ChangeServersVotingStatus.class);
+ assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
+ ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"),
+ shardMrgIDSuffix).toString(), Boolean.TRUE));
+
+ expectMsgClass(duration("5 seconds"), Success.class);
+ }};
+ }
+
+ @Test
+ public void testChangeServersVotingStatusWithNoLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ TestActorRef<MockRespondActor> respondActor =
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor);
+
+ shardManager.tell(new ChangeShardMembersVotingStatus("default",
+ ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+ MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
+
+ Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
+ assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException));
+ }};
+ }
+
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final CountDownLatch snapshotPersist = new CountDownLatch(1);
}
}
+ private void countDownIfOther(final Member member, CountDownLatch latch) {
+ if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
+ latch.countDown();
+ }
+ }
+
@Override
public void handleCommand(Object message) throws Exception {
try{
if(message instanceof FindPrimary) {
findPrimaryMessageReceived.countDown();
} else if(message instanceof ClusterEvent.MemberUp) {
- String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberUpReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.MemberUp)message).member(), memberUpReceived);
} else if(message instanceof ClusterEvent.MemberRemoved) {
- String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberRemovedReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.MemberRemoved)message).member(), memberRemovedReceived);
} else if(message instanceof ClusterEvent.UnreachableMember) {
- String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberUnreachableReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.UnreachableMember)message).member(), memberUnreachableReceived);
} else if(message instanceof ClusterEvent.ReachableMember) {
- String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberReachableReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.ReachableMember)message).member(), memberReachableReceived);
}
}
}
return new Builder(datastoreContextBuilder);
}
- private static class Builder extends AbstractGenericBuilder<Builder, TestShardManager> {
+ private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
private ActorRef shardActor;
private final Map<String, ActorRef> shardActors = new HashMap<>();
}
}
- private static abstract class AbstractGenericBuilder<T extends AbstractGenericBuilder<T, ?>, C extends ShardManager>
- extends ShardManager.AbstractBuilder<T> {
+ private static abstract class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
+ extends AbstractShardManagerCreator<T> {
private final Class<C> shardManagerClass;
- AbstractGenericBuilder(Class<C> shardManagerClass) {
+ AbstractGenericCreator(Class<C> shardManagerClass) {
this.shardManagerClass = shardManagerClass;
cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
}
}
- private static class GenericBuilder<C extends ShardManager> extends AbstractGenericBuilder<GenericBuilder<C>, C> {
- GenericBuilder(Class<C> shardManagerClass) {
+ private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
+ GenericCreator(Class<C> shardManagerClass) {
super(shardManagerClass);
}
}
private static class MockRespondActor extends MessageCollectorActor {
static final String CLEAR_RESPONSE = "clear-response";
- static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
- private volatile Object responseMsg;
+ private Object responseMsg;
+ private final Class<?> requestClass;
@SuppressWarnings("unused")
- public MockRespondActor() {
- }
-
- @SuppressWarnings("unused")
- public MockRespondActor(Object responseMsg) {
+ public MockRespondActor(Class<?> requestClass, Object responseMsg) {
+ this.requestClass = requestClass;
this.responseMsg = responseMsg;
}
- public void updateResponse(Object response) {
- responseMsg = response;
- }
-
@Override
public void onReceive(Object message) throws Exception {
- if(!"get-all-messages".equals(message)) {
- LOG.debug("Received message : {}", message);
- }
- super.onReceive(message);
- if (message instanceof AddServer && responseMsg != null) {
- getSender().tell(responseMsg, getSelf());
- } else if(message instanceof RemoveServer && responseMsg != null){
- getSender().tell(responseMsg, getSelf());
- } else if(message.equals(CLEAR_RESPONSE)) {
+ if(message.equals(CLEAR_RESPONSE)) {
responseMsg = null;
+ } else {
+ super.onReceive(message);
+ if (message.getClass().equals(requestClass) && responseMsg != null) {
+ getSender().tell(responseMsg, getSelf());
+ }
}
}
}