import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
}
@Test
- public void testOnRegisterCandidateLocal() throws Exception {
+ public void testOnRegisterCandidateLocal() {
testLog.info("testOnRegisterCandidateLocal starting");
ShardTestKit kit = new ShardTestKit(getSystem());
}
@Test
- public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
+ public void testOnRegisterCandidateLocalWithNoInitialLeader() {
testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
- ShardTestKit kit = new ShardTestKit(getSystem());
+ final ShardTestKit kit = new ShardTestKit(getSystem());
dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
}
@Test
- public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
+ public void testOnRegisterCandidateLocalWithNoInitialConsensus() {
testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
- ShardTestKit kit = new ShardTestKit(getSystem());
+ final ShardTestKit kit = new ShardTestKit(getSystem());
- dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
- shardTransactionCommitTimeoutInSeconds(1);
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
+ .shardTransactionCommitTimeoutInSeconds(1);
ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
- ShardTestKit kit = new ShardTestKit(getSystem());
+ final ShardTestKit kit = new ShardTestKit(getSystem());
- dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
- shardIsolatedLeaderCheckIntervalInMillis(50);
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
+ .shardIsolatedLeaderCheckIntervalInMillis(50);
ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
}
@Test
- public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
+ public void testOnRegisterCandidateLocalWithRemoteLeader() {
testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
ShardTestKit kit = new ShardTestKit(getSystem());
- dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
- shardBatchedModificationCount(5);
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
+ .shardBatchedModificationCount(5);
ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
- actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
- TestEntityOwnershipShard leaderShard = leader.underlyingActor();
+ actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
+ final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
- MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
+ expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
// Send a bunch of registration messages quickly and verify.
leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
- MessageCollectorActor.clearMessages(leaderShard.collectorActor());
+ clearMessages(leaderShard.collectorActor());
int max = 100;
List<YangInstanceIdentifier> entityIds = new ArrayList<>();
- for(int i = 1; i <= max; i++) {
+ for (int i = 1; i <= max; i++) {
YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
entityIds.add(id);
local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
}
- for(int i = 0; i < max; i++) {
+ for (int i = 0; i < max; i++) {
verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
}
}
@Test
- public void testOnUnregisterCandidateLocal() throws Exception {
+ public void testOnUnregisterCandidateLocal() {
testLog.info("testOnUnregisterCandidateLocal starting");
ShardTestKit kit = new ShardTestKit(getSystem());
}
@Test
- public void testOwnershipChanges() throws Exception {
+ public void testOwnershipChanges() {
testLog.info("testOwnershipChanges starting");
- ShardTestKit kit = new ShardTestKit(getSystem());
+ final ShardTestKit kit = new ShardTestKit(getSystem());
dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
- ShardTestKit kit = new ShardTestKit(getSystem());
+ final ShardTestKit kit = new ShardTestKit(getSystem());
- dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
- shardIsolatedLeaderCheckIntervalInMillis(100000);
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
+ .shardIsolatedLeaderCheckIntervalInMillis(100000);
ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
kit.watch(peer2);
peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
- kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+ kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
kit.unwatch(peer2);
leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
kit.watch(peer1);
peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
- kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+ kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
kit.unwatch(peer1);
leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
+ AtomicLong leaderLastApplied = new AtomicLong();
+ verifyRaftState(leader, rs -> {
+ assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
+ leaderLastApplied.set(rs.getLastApplied());
+ });
+
+ verifyRaftState(peer2, rs -> assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex()));
+
// Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
// the entities (1 and 3) previously owned by the local leader member.
kit.watch(leader);
leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
- kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+ kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
kit.unwatch(leader);
peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
peer2.tell(TimeoutNow.INSTANCE, peer2);
public void testLeaderIsolation() throws Exception {
testLog.info("testLeaderIsolation starting");
- ShardTestKit kit = new ShardTestKit(getSystem());
+ final ShardTestKit kit = new ShardTestKit(getSystem());
ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
- dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
- shardIsolatedLeaderCheckIntervalInMillis(100000);
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
+ .shardIsolatedLeaderCheckIntervalInMillis(100000);
TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
peerId2.toString());
peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
- dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
- shardIsolatedLeaderCheckIntervalInMillis(500);
+ dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+ .shardIsolatedLeaderCheckIntervalInMillis(500);
TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
- verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true),
- ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
+ verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
+ ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
+ ownershipChange(entity3, false, false, true)));
reset(leaderListener);
DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
- verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
- ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true)));
+ verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
+ ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
+ ownershipChange(entity3, false, false, true)));
reset(peer1Listener);
DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
- verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
- ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true)));
+ verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
+ ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
+ ownershipChange(entity3, false, true, true)));
reset(peer2Listener);
// Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
- ae -> ae.getLeaderId().equals(leaderId.toString()));
+ ae -> ae.getLeaderId().equals(leaderId.toString()));
peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
// Make peer1 start an election and become leader by enabling the ElectionTimeout message.
// Expect inJeopardy notification on the isolated leader.
- verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true),
- ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true)));
+ verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
+ ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
+ ownershipChange(entity3, false, false, true, true)));
reset(leaderListener);
verifyRaftState(peer1, state ->
verifyRaftState(leader, state ->
assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
- verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true),
- ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
+ verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
+ ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
+ ownershipChange(entity3, false, false, true)));
verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
}
@Test
- public void testListenerRegistration() throws Exception {
+ public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
+ testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
+
+ final ShardTestKit kit = new ShardTestKit(getSystem());
+
+ ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+ ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
+ ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
+
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
+ .shardIsolatedLeaderCheckIntervalInMillis(100000);
+
+ TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+ newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
+ actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
+ peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+ TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+ newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
+ actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
+ peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+ dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+ .shardIsolatedLeaderCheckIntervalInMillis(500);
+
+ TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+ newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
+ actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
+
+ ShardTestKit.waitUntilLeader(leader);
+
+ // Add listeners on all members
+
+ DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
+ "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
+ leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
+ "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
+ peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
+ "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
+ peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Drop the CandidateAdded message to the leader for now.
+
+ leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
+
+ // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
+ // assigned the owner.
+
+ DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+ leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+ DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
+ leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+
+ // Capture the CandidateAdded messages.
+
+ final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
+ CandidateAdded.class, 2);
+
+ // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
+ // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
+
+ peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+ peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+
+ // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
+
+ leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
+ leader.tell(candidateAdded.get(0), leader);
+ leader.tell(candidateAdded.get(1), leader);
+
+ expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
+ ae -> ae.getEntries().size() > 0);
+
+ // Verify no owner assigned.
+
+ verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
+ verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
+
+ // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
+
+ leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
+ leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+ peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
+ ae -> ae.getLeaderId().equals(leaderId.toString()));
+ peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+ // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
+
+ leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
+ leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+
+ // Verify the leader transitions to IsolatedLeader.
+
+ verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
+ state.getRaftState()));
+
+ // Send PeerDown to the new leader peer1.
+
+ peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+ // Make peer1 start an election and become leader by sending the TimeoutNow message.
+
+ peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+ // Verify the peer1 transitions to Leader.
+
+ verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
+ state.getRaftState()));
+
+ verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
+ verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
+
+ verifyNoMoreInteractions(peer1Listener);
+ verifyNoMoreInteractions(peer2Listener);
+
+ // Add candidate peer1 candidate for entity2.
+
+ peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+
+ verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+ verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
+ verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
+
+ reset(leaderListener, peer1Listener, peer2Listener);
+
+ // Remove the isolation.
+
+ leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
+ leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+ peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+ peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+
+ // Previous leader should switch to Follower.
+
+ verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
+ state.getRaftState()));
+
+ // Send PeerUp to peer1 and peer2.
+
+ peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+ peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+ // The previous leader should become the owner of entity1.
+
+ verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+ // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
+ // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+ // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+ // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
+ // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
+ verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
+ or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
+ or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
+
+ verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+ verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+
+ // Verify entity2's owner doesn't change.
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+
+ verifyNoMoreInteractions(leaderListener);
+ verifyNoMoreInteractions(peer1Listener);
+ verifyNoMoreInteractions(peer2Listener);
+
+ testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
+ }
+
+ @Test
+ public void testListenerRegistration() {
testLog.info("testListenerRegistration starting");
ShardTestKit kit = new ShardTestKit(getSystem());
ShardTestKit.waitUntilLeader(leader);
String otherEntityType = "otherEntityType";
- DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
- DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
- DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
- DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
+ final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+ final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
+ final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
+ final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
// Register listener
}
@Test
- public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
+ public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() {
testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
ShardTestKit kit = new ShardTestKit(getSystem());
- EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
- addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
+ EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
+ .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
- newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString());
+ newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
+ leaderId.toString());
ShardTestKit.waitUntilLeader(leader);
}
@Test
- public void testDelayedEntityOwnerSelection() throws Exception {
+ public void testDelayedEntityOwnerSelection() {
testLog.info("testDelayedEntityOwnerSelection starting");
- ShardTestKit kit = new ShardTestKit(getSystem());
- EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
- addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
+ final ShardTestKit kit = new ShardTestKit(getSystem());
+ EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
+ .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
- newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()),
- leaderId.toString());
+ newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
+ builder.build()), leaderId.toString());
ShardTestKit.waitUntilLeader(leader);
return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
}
- private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
+ private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
+ final String memberName) {
return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
}
- private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
- EntityOwnerSelectionStrategyConfig config) {
+ private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
+ final EntityOwnerSelectionStrategyConfig config) {
return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
.withDispatcher(Dispatchers.DefaultDispatcherId());
}
- private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
- String memberName) {
+ private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
+ final String memberName) {
return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
- dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
+ dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
MemberName.forName(memberName)).ownerSelectionStrategyConfig(
EntityOwnerSelectionStrategyConfig.newBuilder().build());
}
- private Map<String, String> peerMap(String... peerIds) {
+ private Map<String, String> peerMap(final String... peerIds) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
- for(String peerId: peerIds) {
+ for (String peerId: peerIds) {
builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
}
}
private static class TestEntityOwnershipShard extends EntityOwnershipShard {
- private final TestActorRef<MessageCollectorActor> collectorActor;
+ private final ActorRef collectorActor;
private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
- TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+ TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
super(builder);
this.collectorActor = collectorActor;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public void handleCommand(Object message) {
- if(collectorActor != null) {
- collectorActor.tell(message, ActorRef.noSender());
- }
-
+ public void handleCommand(final Object message) {
Predicate drop = dropMessagesOfType.get(message.getClass());
- if(drop == null || !drop.test(message)) {
+ if (drop == null || !drop.test(message)) {
super.handleCommand(message);
}
+
+ if (collectorActor != null) {
+ collectorActor.tell(message, ActorRef.noSender());
+ }
}
- void startDroppingMessagesOfType(Class<?> msgClass) {
+ void startDroppingMessagesOfType(final Class<?> msgClass) {
dropMessagesOfType.put(msgClass, msg -> true);
}
- <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
+ <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
dropMessagesOfType.put(msgClass, filter);
}
- void stopDroppingMessagesOfType(Class<?> msgClass) {
+ void stopDroppingMessagesOfType(final Class<?> msgClass) {
dropMessagesOfType.remove(msgClass);
}
- TestActorRef<MessageCollectorActor> collectorActor() {
+ ActorRef collectorActor() {
return collectorActor;
}
- static Props props(Builder builder) {
+ static Props props(final Builder builder) {
return props(builder, null);
}
- static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
- return Props.create(TestEntityOwnershipShard.class, builder, collectorActor).
- withDispatcher(Dispatchers.DefaultDispatcherId());
+ static Props props(final Builder builder, final ActorRef collectorActor) {
+ return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
+ .withDispatcher(Dispatchers.DefaultDispatcherId());
}
}
}