Bug 7362: Notify applyState synchronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
index 9d64b140fb7d9db2c191eb12ffc847deb5273561..8cb914c2267b51375ef819ce26f9ff0818882347 100644 (file)
@@ -21,8 +21,14 @@ import static org.mockito.Mockito.verify;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.util.ArrayList;
@@ -31,15 +37,21 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActor;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorTest;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -50,9 +62,14 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -1087,6 +1104,208 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
     }
 
+    @Test
+    public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception {
+        String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(2);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 1 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
+                MockRaftActor.toObject(snapshot.getState()));
+    }
+
+    @Test
+    public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception {
+        String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(2);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+                newReplicatedLogEntry(1, 2, "three"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 2 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+                entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
+
+        assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+        assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
+
+        // Reinstate the actor from persistence
+
+        actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
+
+        followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+        assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
+        assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
+        assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
+        assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+                entries.get(2).getData()), followerRaftActor.get().getState());
+    }
+
+    @Test
+    public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception {
+        String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(1);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+                newReplicatedLogEntry(1, 2, "three"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 0 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
+        assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
+                MockRaftActor.toObject(snapshot.getState()));
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
+        RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+            @Override
+            public void createSnapshot(ActorRef actorRef) {
+                try {
+                    actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
+                            followerRaftActor.get().getState()).toByteArray()), actorRef);
+                } catch (Exception e) {
+                    Throwables.propagate(e);
+                }
+            }
+
+            @Override
+            public void applySnapshot(byte[] snapshotBytes) {
+            }
+        };
+        return snapshotCohort;
+    }
+
     public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
         int snapshotLength = bs.size();
         int start = offset;