X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardTest.java;h=c268e3eff59df6b81372822616dae5b2fa6cc63a;hp=9660a2a39289890a74361f97b3d44a5cb9e2794d;hb=0fab6c716548e89938c1a8493dc25991c006aa10;hpb=7ea5e81beb0b5d265713e01a14cfa2562ea28c6c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index 9660a2a392..c268e3eff5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -7,6 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.junit.Assert.assertEquals; import static org.mockito.AdditionalMatchers.or; import static org.mockito.Matchers.any; @@ -17,6 +20,7 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -39,6 +43,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.ShardTestKit; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; @@ -57,7 +62,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.RequestVote; -import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; @@ -272,12 +276,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class); + expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class); // Send a bunch of registration messages quickly and verify. leaderShard.stopDroppingMessagesOfType(BatchedModifications.class); - MessageCollectorActor.clearMessages(leaderShard.collectorActor()); + clearMessages(leaderShard.collectorActor()); int max = 100; List entityIds = new ArrayList<>(); @@ -835,6 +839,189 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { testLog.info("testLeaderIsolation ending"); } + @Test + public void testLeaderIsolationWithPendingCandidateAdded() throws Exception { + testLog.info("testLeaderIsolationWithPendingCandidateAdded starting"); + + ShardTestKit kit = new ShardTestKit(getSystem()); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). + shardIsolatedLeaderCheckIntervalInMillis(100000); + + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()). + shardIsolatedLeaderCheckIntervalInMillis(500); + + TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); + + // Add listeners on all members + + DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME); + leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME); + peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME); + peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Drop the CandidateAdded message to the leader for now. + + leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class); + + // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be + // assigned the owner. + + DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + leader.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + leader.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + + // Capture the CandidateAdded messages. + + List candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), CandidateAdded.class, 2); + + // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we + // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted. + + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0); + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0); + + // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries. + + leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class); + leader.tell(candidateAdded.get(0), leader); + leader.tell(candidateAdded.get(1), leader); + + expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, ae -> ae.getEntries().size() > 0); + + // Verify no owner assigned. + + verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier()); + verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier()); + + // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers. + + leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, + ae -> ae.getLeaderId().equals(leaderId.toString())); + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + // Send PeerDown to the isolated leader - should be no-op since there's no owned entities. + + leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); + leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); + + // Verify the leader transitions to IsolatedLeader. + + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState())); + + // Send PeerDown to the new leader peer1. + + peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + // Make peer1 start an election and become leader by sending the TimeoutNow message. + + peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); + + // Verify the peer1 transitions to Leader. + + verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); + + verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier()); + verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier()); + + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + // Add candidate peer1 candidate for entity2. + + peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true)); + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true)); + + reset(leaderListener, peer1Listener, peer2Listener); + + // Remove the isolation. + + leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + + // Previous leader should switch to Follower. + + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState())); + + // Send PeerUp to peer1 and peer2. + + peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + // The previous leader should become the owner of entity1. + + verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + // The previous leader's DOMEntityOwnershipListener should get 4 total notifications: + // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) + // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) + // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false) + // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false) + verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(or(ownershipChange(entity1, false, false, false), + ownershipChange(entity2, false, false, false)), or(ownershipChange(entity1, false, true, true), + ownershipChange(entity2, false, false, true)))); + + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + + // Verify entity2's owner doesn't change. + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + + verifyNoMoreInteractions(leaderListener); + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + testLog.info("testLeaderIsolationWithPendingCandidateAdded ending"); + } + @Test public void testListenerRegistration() throws Exception { testLog.info("testListenerRegistration starting"); @@ -1063,14 +1250,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void handleCommand(Object message) { - if(collectorActor != null) { - collectorActor.tell(message, ActorRef.noSender()); - } - Predicate drop = dropMessagesOfType.get(message.getClass()); if(drop == null || !drop.test(message)) { super.handleCommand(message); } + + if(collectorActor != null) { + collectorActor.tell(message, ActorRef.noSender()); + } } void startDroppingMessagesOfType(Class msgClass) {