import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.junit.After;
import org.junit.Test;
}
@Test
- public void testOnRegisterCandidateLocal() throws Exception {
+ public void testOnRegisterCandidateLocal() {
testLog.info("testOnRegisterCandidateLocal starting");
ShardTestKit kit = new ShardTestKit(getSystem());
}
@Test
- public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
+ public void testOnRegisterCandidateLocalWithNoInitialLeader() {
testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
final ShardTestKit kit = new ShardTestKit(getSystem());
}
@Test
- public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
+ public void testOnRegisterCandidateLocalWithNoInitialConsensus() {
testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
final ShardTestKit kit = new ShardTestKit(getSystem());
}
@Test
- public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
+ public void testOnRegisterCandidateLocalWithRemoteLeader() {
testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
ShardTestKit kit = new ShardTestKit(getSystem());
ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
- actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
+ actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
}
@Test
- public void testOnUnregisterCandidateLocal() throws Exception {
+ public void testOnUnregisterCandidateLocal() {
testLog.info("testOnUnregisterCandidateLocal starting");
ShardTestKit kit = new ShardTestKit(getSystem());
}
@Test
- public void testOwnershipChanges() throws Exception {
+ public void testOwnershipChanges() {
testLog.info("testOwnershipChanges starting");
final ShardTestKit kit = new ShardTestKit(getSystem());
kit.watch(peer2);
peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
- kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+ kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
kit.unwatch(peer2);
leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
kit.watch(peer1);
peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
- kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+ kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
kit.unwatch(peer1);
leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
+ AtomicLong leaderLastApplied = new AtomicLong();
+ verifyRaftState(leader, rs -> {
+ assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
+ leaderLastApplied.set(rs.getLastApplied());
+ });
+
+ verifyRaftState(peer2, rs -> assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex()));
+
// Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
// the entities (1 and 3) previously owned by the local leader member.
kit.watch(leader);
leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
- kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+ kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
kit.unwatch(leader);
peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
peer2.tell(TimeoutNow.INSTANCE, peer2);
TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
- actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString());
+ actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
}
@Test
- public void testListenerRegistration() throws Exception {
+ public void testListenerRegistration() {
testLog.info("testListenerRegistration starting");
ShardTestKit kit = new ShardTestKit(getSystem());
}
@Test
- public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
+ public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() {
testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
ShardTestKit kit = new ShardTestKit(getSystem());
}
@Test
- public void testDelayedEntityOwnerSelection() throws Exception {
+ public void testDelayedEntityOwnerSelection() {
testLog.info("testDelayedEntityOwnerSelection starting");
final ShardTestKit kit = new ShardTestKit(getSystem());
return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
}
- private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
+ private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
+ final String memberName) {
return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
}
- private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
- EntityOwnerSelectionStrategyConfig config) {
+ private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
+ final EntityOwnerSelectionStrategyConfig config) {
return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
.withDispatcher(Dispatchers.DefaultDispatcherId());
}
- private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
- String memberName) {
+ private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
+ final String memberName) {
return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
- dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
+ dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
MemberName.forName(memberName)).ownerSelectionStrategyConfig(
EntityOwnerSelectionStrategyConfig.newBuilder().build());
}
- private Map<String, String> peerMap(String... peerIds) {
+ private Map<String, String> peerMap(final String... peerIds) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
for (String peerId: peerIds) {
builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
}
private static class TestEntityOwnershipShard extends EntityOwnershipShard {
- private final TestActorRef<MessageCollectorActor> collectorActor;
+ private final ActorRef collectorActor;
private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
- TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+ TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
super(builder);
this.collectorActor = collectorActor;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public void handleCommand(Object message) {
+ public void handleCommand(final Object message) {
Predicate drop = dropMessagesOfType.get(message.getClass());
if (drop == null || !drop.test(message)) {
super.handleCommand(message);
}
}
- void startDroppingMessagesOfType(Class<?> msgClass) {
+ void startDroppingMessagesOfType(final Class<?> msgClass) {
dropMessagesOfType.put(msgClass, msg -> true);
}
- <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
+ <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
dropMessagesOfType.put(msgClass, filter);
}
- void stopDroppingMessagesOfType(Class<?> msgClass) {
+ void stopDroppingMessagesOfType(final Class<?> msgClass) {
dropMessagesOfType.remove(msgClass);
}
- TestActorRef<MessageCollectorActor> collectorActor() {
+ ActorRef collectorActor() {
return collectorActor;
}
- static Props props(Builder builder) {
+ static Props props(final Builder builder) {
return props(builder, null);
}
- static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+ static Props props(final Builder builder, final ActorRef collectorActor) {
return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
.withDispatcher(Dispatchers.DefaultDispatcherId());
}