X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=15fca0cb8d80696393e30ea465d5263ba339fa91;hb=ddd479df27cfc49f353ceb66cd289694e891761a;hp=9ca2ba0a6b40e27766938d1d1298735aab676421;hpb=e1eca73a5ae2ffae8dd78c6fe5281cd2f45d5ef3;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 9ca2ba0a6b..15fca0cb8d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -21,25 +21,38 @@ import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import com.google.common.base.Optional; import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.MockRaftActor; +import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; @@ -50,8 +63,17 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; +import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; @@ -90,7 +112,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef) { MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); - context.setPayloadVersion(payloadVersion ); + context.setPayloadVersion(payloadVersion); return context; } @@ -791,7 +813,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - ByteString bsSnapshot = createSnapshot(); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -820,7 +842,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastAppliedIndex()); assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); - Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getState type", ByteState.class, snapshot.getState().getClass()); + Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes()); assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); applySnapshot.getCallback().onSuccess(); @@ -1086,6 +1109,219 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { verify(follower, never()).scheduleElection(any(FiniteDuration.class)); } + @Test + public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception { + String id = "testCaptureSnapshotOnLastEntryInAppendEntries"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(2); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 1 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()), + MockRaftActor.fromState(snapshot.getState())); + } + + @Test + public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception { + String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(2); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), + newReplicatedLogEntry(1, 2, "three")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 2 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(), + entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState())); + + assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); + assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex()); + + // Reinstate the actor from persistence + + actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem())); + + followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); + assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex()); + assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied()); + assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex()); + assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(), + entries.get(2).getData()), followerRaftActor.get().getState()); + } + + @Test + public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception { + String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(1); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), + newReplicatedLogEntry(1, 2, "three")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 0 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex()); + assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()), + MockRaftActor.fromState(snapshot.getState())); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference followerRaftActor) { + RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { + @Override + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + try { + actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()), + installSnapshotStream), actorRef); + } catch (Exception e) { + Throwables.propagate(e); + } + } + + @Override + public void applySnapshot(State snapshotState) { + } + + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); + } + }; + return snapshotCohort; + } + public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) { int snapshotLength = bs.size(); int start = offset; @@ -1126,7 +1362,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { - return new MockRaftActorContext.MockReplicatedLogEntry(term, index, + return new SimpleReplicatedLogEntry(index, term, new MockRaftActorContext.MockPayload(data)); }