Bug 6540: EOS - handle edge case with pruning pending owner change commits
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
index 9660a2a39289890a74361f97b3d44a5cb9e2794d..c268e3eff59df6b81372822616dae5b2fa6cc63a 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.AdditionalMatchers.or;
 import static org.mockito.Matchers.any;
@@ -17,6 +20,7 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -39,6 +43,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
@@ -57,7 +62,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
@@ -272,12 +276,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
-        MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
+        expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
 
         // Send a bunch of registration messages quickly and verify.
 
         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
-        MessageCollectorActor.clearMessages(leaderShard.collectorActor());
+        clearMessages(leaderShard.collectorActor());
 
         int max = 100;
         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
@@ -835,6 +839,189 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         testLog.info("testLeaderIsolation ending");
     }
 
+    @Test
+    public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
+        testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
+
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
+        ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
+            shardIsolatedLeaderCheckIntervalInMillis(100000);
+
+        TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
+                actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString());
+        peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
+                actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
+        peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
+                shardIsolatedLeaderCheckIntervalInMillis(500);
+
+        TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
+                actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
+
+        ShardTestKit.waitUntilLeader(leader);
+
+        // Add listeners on all members
+
+        DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
+                "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
+        leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
+                "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
+        peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
+                "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
+        peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        // Drop the CandidateAdded message to the leader for now.
+
+        leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
+
+        // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
+        // assigned the owner.
+
+        DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+        leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
+        leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        // Capture the CandidateAdded messages.
+
+        List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), CandidateAdded.class, 2);
+
+        // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
+        // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
+
+        peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+        peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+
+        // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
+
+        leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
+        leader.tell(candidateAdded.get(0), leader);
+        leader.tell(candidateAdded.get(1), leader);
+
+        expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, ae -> ae.getEntries().size() > 0);
+
+        // Verify no owner assigned.
+
+        verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
+        verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
+
+        // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
+
+        leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
+        leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+        peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
+                ae -> ae.getLeaderId().equals(leaderId.toString()));
+        peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+        // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
+
+        leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
+        leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+
+        // Verify the leader transitions to IsolatedLeader.
+
+        verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
+
+        // Send PeerDown to the new leader peer1.
+
+        peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+        // Make peer1 start an election and become leader by sending the TimeoutNow message.
+
+        peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+        // Verify the peer1 transitions to Leader.
+
+        verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
+
+        verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
+        verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
+
+        verifyNoMoreInteractions(peer1Listener);
+        verifyNoMoreInteractions(peer2Listener);
+
+        // Add candidate peer1 candidate for entity2.
+
+        peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+
+        verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+        verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
+        verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
+
+        reset(leaderListener, peer1Listener, peer2Listener);
+
+        // Remove the isolation.
+
+        leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
+        leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+        peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+        peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+
+        // Previous leader should switch to Follower.
+
+        verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
+
+        // Send PeerUp to peer1 and peer2.
+
+        peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+        peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+        // The previous leader should become the owner of entity1.
+
+        verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
+        //     - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+        //     - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+        //     - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
+        //     - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
+        verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(or(ownershipChange(entity1, false, false, false),
+                ownershipChange(entity2, false, false, false)), or(ownershipChange(entity1, false, true, true),
+                        ownershipChange(entity2, false, false, true))));
+
+        verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+        verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+
+        // Verify entity2's owner doesn't change.
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+
+        verifyNoMoreInteractions(leaderListener);
+        verifyNoMoreInteractions(peer1Listener);
+        verifyNoMoreInteractions(peer2Listener);
+
+        testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
+    }
+
     @Test
     public void testListenerRegistration() throws Exception {
         testLog.info("testListenerRegistration starting");
@@ -1063,14 +1250,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
         public void handleCommand(Object message) {
-            if(collectorActor != null) {
-                collectorActor.tell(message, ActorRef.noSender());
-            }
-
             Predicate drop = dropMessagesOfType.get(message.getClass());
             if(drop == null || !drop.test(message)) {
                 super.handleCommand(message);
             }
+
+            if(collectorActor != null) {
+                collectorActor.tell(message, ActorRef.noSender());
+            }
         }
 
         void startDroppingMessagesOfType(Class<?> msgClass) {