X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=34932c7249a38f753856cdb99addf72d221f0722;hb=08dd5c2c443ff53f56af88a0e8dc8f34e36d2245;hp=9eb2fb757bd2da0c6b2ce48ebbfeb72471ef48c1;hpb=1b69a9ae877e79ba6addb1b0e343692b2acff1ec;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 9eb2fb757b..34932c7249 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1,5 +1,18 @@ package org.opendaylight.controller.cluster.raft; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; @@ -21,68 +34,75 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; -import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - public class RaftActorTest extends AbstractActorTest { + private TestActorFactory factory; + + @Before + public void setUp(){ + factory = new TestActorFactory(getSystem()); + } @After - public void tearDown() { - MockAkkaJournal.clearJournal(); - MockSnapshotStore.setMockSnapshot(null); + public void tearDown() throws Exception { + factory.close(); + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } public static class MockRaftActor extends RaftActor { - private final DataPersistenceProvider dataPersistenceProvider; + protected DataPersistenceProvider dataPersistenceProvider; private final RaftActor delegate; + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final List state; + private ActorRef roleChangeNotifier; + private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); public static final class MockRaftActorCreator implements Creator { private static final long serialVersionUID = 1L; @@ -90,26 +110,29 @@ public class RaftActorTest extends AbstractActorTest { private final String id; private final Optional config; private final DataPersistenceProvider dataPersistenceProvider; + private final ActorRef roleChangeNotifier; private MockRaftActorCreator(Map peerAddresses, String id, - Optional config, DataPersistenceProvider dataPersistenceProvider) { + Optional config, DataPersistenceProvider dataPersistenceProvider, + ActorRef roleChangeNotifier) { this.peerAddresses = peerAddresses; this.id = id; this.config = config; this.dataPersistenceProvider = dataPersistenceProvider; + this.roleChangeNotifier = roleChangeNotifier; } @Override public MockRaftActor create() throws Exception { - return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider); + MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config, + dataPersistenceProvider); + mockRaftActor.roleChangeNotifier = this.roleChangeNotifier; + return mockRaftActor; } } - private final CountDownLatch recoveryComplete = new CountDownLatch(1); - - private final List state; - - public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { + public MockRaftActor(String id, Map peerAddresses, Optional config, + DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); this.delegate = mock(RaftActor.class); @@ -128,29 +151,38 @@ public class RaftActorTest extends AbstractActorTest { } } + public void waitForInitializeBehaviorComplete() { + try { + assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + public List getState() { return state; } public static Props props(final String id, final Map peerAddresses, Optional config){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null)); } public static Props props(final String id, final Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); + } @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); - LOG.info("applyState called"); + LOG.info("{}: applyState called", persistenceId()); } - - - @Override protected void startLogRecoveryBatch(int maxBatchSize) { } @@ -171,11 +203,16 @@ public class RaftActorTest extends AbstractActorTest { } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { - delegate.applyRecoverySnapshot(snapshot); + protected void initializeBehavior() { + super.initializeBehavior(); + initializeBehaviorComplete.countDown(); + } + + @Override + protected void applyRecoverySnapshot(byte[] bytes) { + delegate.applyRecoverySnapshot(bytes); try { - Object data = toObject(snapshot); - System.out.println("!!!!!applyRecoverySnapshot: "+data); + Object data = toObject(bytes); if (data instanceof List) { state.addAll((List) data); } @@ -185,10 +222,12 @@ public class RaftActorTest extends AbstractActorTest { } @Override protected void createSnapshot() { + LOG.info("{}: createSnapshot called", persistenceId()); delegate.createSnapshot(); } - @Override protected void applySnapshot(ByteString snapshot) { + @Override protected void applySnapshot(byte [] snapshot) { + LOG.info("{}: applySnapshot called", persistenceId()); delegate.applySnapshot(snapshot); } @@ -201,16 +240,21 @@ public class RaftActorTest extends AbstractActorTest { return this.dataPersistenceProvider; } + @Override + protected Optional getRoleChangeNotifier() { + return Optional.fromNullable(roleChangeNotifier); + } + @Override public String persistenceId() { return this.getId(); } - private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; try { - bis = new ByteArrayInputStream(bs.toByteArray()); + bis = new ByteArrayInputStream(bs); ois = new ObjectInputStream(bis); obj = ois.readObject(); } finally { @@ -231,7 +275,7 @@ public class RaftActorTest extends AbstractActorTest { } - private static class RaftActorTestKit extends JavaTestKit { + public static class RaftActorTestKit extends JavaTestKit { private final ActorRef raftActor; public RaftActorTestKit(ActorSystem actorSystem, String actorName) { @@ -267,7 +311,7 @@ public class RaftActorTest extends AbstractActorTest { waitUntilLeader(raftActor); } - protected void waitUntilLeader(ActorRef actorRef) { + public static void waitUntilLeader(ActorRef actorRef) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); for(int i = 0; i < 20 * 5; i++) { Future future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration)); @@ -306,7 +350,7 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftActorRecovery() throws Exception { new JavaTestKit(getSystem()) {{ - String persistenceId = "follower10"; + String persistenceId = factory.generateActorId("follower-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); // Set the heartbeat interval high to essentially disable election otherwise the test @@ -314,8 +358,8 @@ public class RaftActorTest extends AbstractActorTest { // log entry. config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config)), persistenceId); + ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config)), persistenceId); watch(followerActor); @@ -327,18 +371,17 @@ public class RaftActorTest extends AbstractActorTest { int lastAppliedDuringSnapshotCapture = 3; int lastIndexDuringSnapshotCapture = 4; - // 4 messages as part of snapshot, which are applied to state - ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + // 4 messages as part of snapshot, which are applied to state + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), - snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , + snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1); - MockSnapshotStore.setMockSnapshot(snapshot); - MockSnapshotStore.setPersistenceId(persistenceId); + InMemorySnapshotStore.addSnapshot(persistenceId, snapshot); // add more entries after snapshot is taken List entries = new ArrayList<>(); @@ -355,11 +398,11 @@ public class RaftActorTest extends AbstractActorTest { int lastAppliedToState = 5; int lastIndex = 7; - MockAkkaJournal.addToJournal(5, entry2); + InMemoryJournal.addEntry(persistenceId, 5, entry2); // 2 entries are applied to state besides the 4 entries in snapshot - MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState)); - MockAkkaJournal.addToJournal(7, entry3); - MockAkkaJournal.addToJournal(8, entry4); + InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState)); + InMemoryJournal.addEntry(persistenceId, 7, entry3); + InMemoryJournal.addEntry(persistenceId, 8, entry4); // kill the actor followerActor.tell(PoisonPill.getInstance(), null); @@ -368,8 +411,8 @@ public class RaftActorTest extends AbstractActorTest { unwatch(followerActor); //reinstate the actor - TestActorRef ref = TestActorRef.create(getSystem(), - MockRaftActor.props(persistenceId, Collections.emptyMap(), + TestActorRef ref = factory.createTestActor( + MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config))); ref.underlyingActor().waitForRecoveryComplete(); @@ -384,6 +427,46 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + // Setup the persisted journal with some entries + ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload("zero")); + ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("oen")); + ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + new MockRaftActorContext.MockPayload("two")); + + long seqNr = 1; + InMemoryJournal.addEntry(persistenceId, seqNr++, entry0); + InMemoryJournal.addEntry(persistenceId, seqNr++, entry1); + InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1)); + InMemoryJournal.addEntry(persistenceId, seqNr++, entry2); + + int lastAppliedToState = 1; + int lastIndex = 2; + + //reinstate the actor + TestActorRef leaderActor = factory.createTestActor( + MockRaftActor.props(persistenceId, Collections.emptyMap(), + Optional.of(config))); + + leaderActor.underlyingActor().waitForRecoveryComplete(); + + RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext(); + assertEquals("Journal log size", 3, context.getReplicatedLog().size()); + assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); + assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); + }}; + } + /** * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does * process recovery messages @@ -395,32 +478,32 @@ public class RaftActorTest extends AbstractActorTest { public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config)), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config)), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); // Wait for akka's recovery to complete so it doesn't interfere. mockRaftActor.waitForRecoveryComplete(); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), - Lists.newArrayList(), 3, 1 ,3, 1); + Lists.newArrayList(), 3, 1, 3, 1); mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes)); + verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -432,7 +515,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("add replicated log entry", 2, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1)); assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex()); @@ -449,8 +532,6 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - }}; } @@ -464,32 +545,32 @@ public class RaftActorTest extends AbstractActorTest { public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); // Wait for akka's recovery to complete so it doesn't interfere. mockRaftActor.waitForRecoveryComplete(); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), - Lists.newArrayList(), 3, 1 ,3, 1); + Lists.newArrayList(), 3, 1, 3, 1); mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class)); + verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -501,7 +582,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("add replicated log entry", 0, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1)); assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex()); @@ -515,8 +596,6 @@ public class RaftActorTest extends AbstractActorTest { assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); - - mockActorRef.tell(PoisonPill.getInstance(), getRef()); }}; } @@ -525,7 +604,7 @@ public class RaftActorTest extends AbstractActorTest { public void testUpdatingElectionTermCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testUpdatingElectionTermCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -535,17 +614,16 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); dataPersistenceProviderMonitor.setPersistLatch(persistLatch); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar"); assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); - - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -554,7 +632,7 @@ public class RaftActorTest extends AbstractActorTest { public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -562,19 +640,18 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)); mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry); verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class)); - - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -583,7 +660,7 @@ public class RaftActorTest extends AbstractActorTest { public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -591,28 +668,27 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); - - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @Test - public void testApplyLogEntriesCallsDataPersistence() throws Exception { + public void testApplyJournalEntriesCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testApplyLogEntriesCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -620,18 +696,19 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - mockRaftActor.onReceiveCommand(new ApplyLogEntries(10)); + mockRaftActor.waitForInitializeBehaviorComplete(); - verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); + mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); + verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); } + }; } @@ -639,7 +716,7 @@ public class RaftActorTest extends AbstractActorTest { public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -647,30 +724,30 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), - MockRaftActor.props(persistenceId,Collections.emptyMap(), - Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, Collections.emptyMap(), + Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - ByteString snapshotBytes = fromObject(Arrays.asList( + mockRaftActor.waitForInitializeBehaviorComplete(); + + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); verify(dataPersistenceProvider).saveSnapshot(anyObject()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -679,7 +756,7 @@ public class RaftActorTest extends AbstractActorTest { public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -687,16 +764,18 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class))); + mockRaftActor.waitForInitializeBehaviorComplete(); + + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class))); ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), @@ -707,11 +786,12 @@ public class RaftActorTest extends AbstractActorTest { RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1)); + long replicatedToAllIndex = 1; + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); verify(mockRaftActor.delegate).createSnapshot(); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100))); @@ -719,15 +799,16 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).deleteMessages(100); - assertEquals(2, mockRaftActor.getReplicatedLog().size()); + assertEquals(3, mockRaftActor.getReplicatedLog().size()); + assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex()); + assertNotNull(mockRaftActor.getReplicatedLog().get(2)); assertNotNull(mockRaftActor.getReplicatedLog().get(3)); assertNotNull(mockRaftActor.getReplicatedLog().get(4)); // Index 2 will not be in the log because it was removed due to snapshotting - assertNull(mockRaftActor.getReplicatedLog().get(2)); - - mockActorRef.tell(PoisonPill.getInstance(), getRef()); + assertNull(mockRaftActor.getReplicatedLog().get(1)); + assertNull(mockRaftActor.getReplicatedLog().get(0)); } }; @@ -738,7 +819,7 @@ public class RaftActorTest extends AbstractActorTest { new JavaTestKit(getSystem()) { { - String persistenceId = "testApplyState"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -746,11 +827,13 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); @@ -758,8 +841,6 @@ public class RaftActorTest extends AbstractActorTest { verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -768,7 +849,7 @@ public class RaftActorTest extends AbstractActorTest { public void testApplySnapshot() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testApplySnapshot"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -776,24 +857,26 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog(); - oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class))); - oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class))); + oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); + oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); oldReplicatedLog.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, - mock(Payload.class))); + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + mock(Payload.class))); ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = mock(Snapshot.class); @@ -803,18 +886,16 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); - verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes)); + verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", - oldReplicatedLog != mockRaftActor.getReplicatedLog()); + oldReplicatedLog != mockRaftActor.getReplicatedLog()); assertEquals("lastApplied should be same as in the snapshot", - (Long) 3L, mockRaftActor.getLastApplied()); + (Long) 3L, mockRaftActor.getLastApplied()); assertEquals(0, mockRaftActor.getReplicatedLog().size()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -823,7 +904,7 @@ public class RaftActorTest extends AbstractActorTest { public void testSaveSnapshotFailure() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testSaveSnapshotFailure"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -831,12 +912,14 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - ByteString snapshotBytes = fromObject(Arrays.asList( + mockRaftActor.waitForInitializeBehaviorComplete(); + + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), @@ -846,9 +929,9 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L), new Exception())); @@ -856,12 +939,486 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Snapshot index should not have advanced because save snapshot failed", -1, mockRaftActor.getReplicatedLog().getSnapshotIndex()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); + } + }; + } + + @Test + public void testRaftRoleChangeNotifier() throws Exception { + new JavaTestKit(getSystem()) {{ + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); + MessageCollectorActor.waitUntilReady(notifierActor); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + long heartBeatInterval = 100; + config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(1); + + String persistenceId = factory.generateActorId("notifier-"); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); + + List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); + + // check if the notifier got a role change from null to Follower + RoleChanged raftRoleChanged = matches.get(0); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertNull(raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Follower to Candidate + raftRoleChanged = matches.get(1); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Candidate to Leader + raftRoleChanged = matches.get(2); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + + assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + + notifierActor.underlyingActor().clear(); + + MockRaftActor raftActor = raftActorRef.underlyingActor(); + final String newLeaderId = "new-leader"; + Follower follower = new Follower(raftActor.getRaftActorContext()) { + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + leaderId = newLeaderId; + return this; + } + }; + + raftActor.changeCurrentBehavior(follower); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(null, leaderStateChange.getLeaderId()); + + raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(newLeaderId, leaderStateChange.getLeaderId()); + }}; + } + + @Test + public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = factory.generateActorId("leader-"); + String follower1Id = factory.generateActorId("follower-"); + + ActorRef followerActor1 = + factory.createActor(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1Id, followerActor1.path().toString()); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + + leaderActor.getRaftActorContext().setCommitIndex(4); + leaderActor.getRaftActorContext().setLastApplied(4); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + + leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(leaderActor.delegate).createSnapshot(); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + //fake snapshot on index 5 + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1)); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1)); + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, leaderActor.getReplicatedLog().size()); + assertEquals(7, leaderActor.getReplicatedLog().lastIndex()); + + // add another non-replicated entry + leaderActor.getReplicatedLog().append( + new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); + + //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1)); + assertEquals(2, leaderActor.getReplicatedLog().size()); + assertEquals(8, leaderActor.getReplicatedLog().lastIndex()); } }; } + @Test + public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = factory.generateActorId("follower-"); + String leaderId = factory.generateActorId("leader-"); + + + ActorRef leaderActor1 = + factory.createActor(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(leaderId, leaderActor1.path().toString()); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor followerActor = mockActorRef.underlyingActor(); + followerActor.getRaftActorContext().setCommitIndex(4); + followerActor.getRaftActorContext().setLastApplied(4); + followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + followerActor.waitForInitializeBehaviorComplete(); + + + Follower follower = new Follower(followerActor.getRaftActorContext()); + followerActor.setCurrentBehavior(follower); + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); + + // log has indices 0-5 + assertEquals(6, followerActor.getReplicatedLog().size()); + + //snapshot on 4 + followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + + followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(followerActor.delegate).createSnapshot(); + + assertEquals(6, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + List entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("foo-6")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5)); + assertEquals(7, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 7 + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, + new MockRaftActorContext.MockPayload("foo-7")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6)); + assertEquals(8, followerActor.getReplicatedLog().size()); + + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex + assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log + assertEquals(7, followerActor.getReplicatedLog().lastIndex()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8, + new MockRaftActorContext.MockPayload("foo-7")) + ); + // send an additional entry 8 with leaderCommit = 7 + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7)); + + // 7 and 8, as lastapplied is 7 + assertEquals(2, followerActor.getReplicatedLog().size()); + + } + }; + } + + @Test + public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = factory.generateActorId("leader-"); + String follower1Id = factory.generateActorId("follower-"); + String follower2Id = factory.generateActorId("follower-"); + + ActorRef followerActor1 = + factory.createActor(Props.create(MessageCollectorActor.class), follower1Id); + ActorRef followerActor2 = + factory.createActor(Props.create(MessageCollectorActor.class), follower2Id); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1Id, followerActor1.path().toString()); + peerAddresses.put(follower2Id, followerActor2.path().toString()); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(9); + leaderActor.getRaftActorContext().setLastApplied(9); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // create 5 entries in the log + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build()); + + //set the snapshot index to 4 , 0 to 4 are snapshotted + leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4); + //setting replicatedToAllIndex = 9, for the log to clear + leader.setReplicatedToAllIndex(9); + assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1)); + assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // set the 2nd follower nextIndex to 1 which has been snapshotted + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1)); + assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // simulate a real snapshot + leaderActor.onReceiveCommand(new SendHeartBeat()); + assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ", + leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()) + , RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + + //reply from a slow follower does not initiate a fake snapshot + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1)); + assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size()); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); + + //reply from a slow follower after should not raise errors + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1)); + assertEquals(0, leaderActor.getReplicatedLog().size()); + } + }; + } + + + private static class NonPersistentProvider implements DataPersistenceProvider { + @Override + public boolean isRecoveryApplicable() { + return false; + } + + @Override + public void persist(T o, Procedure procedure) { + try { + procedure.apply(o); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void saveSnapshot(Object o) { + + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + + } + + @Override + public void deleteMessages(long sequenceNumber) { + + } + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + for(int i=0;i< 4;i++) { + leaderActor.getReplicatedLog() + .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i, + new MockRaftActorContext.MockPayload("A"))); + } + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertEquals(-1, leader.getReplicatedToAllIndex()); + + }}; + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + leaderActor.getReplicatedLog().setSnapshotIndex(3); + + leaderActor.waitForInitializeBehaviorComplete(); + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + leader.setReplicatedToAllIndex(3); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertEquals(3, leader.getReplicatedToAllIndex()); + + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; @@ -881,4 +1438,5 @@ public class RaftActorTest extends AbstractActorTest { } } } + }