X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=17a81ac3c39aa9c0a27743e1739892629a157ba8;hp=59046fd779fb1319ed4d8f48bb8f6911b9eae301;hb=9b1894ccc1808f08e91232c6a67e5708d14ea2ea;hpb=766613c8c5cbfe2e790e6d7b4531227899e84f2c diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 59046fd779..17a81ac3c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -31,6 +31,7 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -53,24 +54,28 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; -import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -88,13 +93,12 @@ public class RaftActorTest extends AbstractActorTest { @After public void tearDown() throws Exception { factory.close(); - MockAkkaJournal.clearJournal(); - MockSnapshotStore.setMockSnapshot(null); + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } public static class MockRaftActor extends RaftActor { - private final DataPersistenceProvider dataPersistenceProvider; private final RaftActor delegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; @@ -134,9 +138,9 @@ public class RaftActorTest extends AbstractActorTest { state = new ArrayList<>(); this.delegate = mock(RaftActor.class); if(dataPersistenceProvider == null){ - this.dataPersistenceProvider = new PersistentDataProvider(); + setPersistence(true); } else { - this.dataPersistenceProvider = dataPersistenceProvider; + setPersistence(dataPersistenceProvider); } } @@ -156,6 +160,16 @@ public class RaftActorTest extends AbstractActorTest { } } + + public void waitUntilLeader(){ + for(int i = 0;i < 10; i++){ + if(isLeader()){ + break; + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + public List getState() { return state; } @@ -175,9 +189,16 @@ public class RaftActorTest extends AbstractActorTest { return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier, + DataPersistenceProvider dataPersistenceProvider){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); + } + + @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); - LOG.info("applyState called"); + LOG.info("{}: applyState called", persistenceId()); } @Override @@ -219,10 +240,12 @@ public class RaftActorTest extends AbstractActorTest { } @Override protected void createSnapshot() { + LOG.info("{}: createSnapshot called", persistenceId()); delegate.createSnapshot(); } @Override protected void applySnapshot(byte [] snapshot) { + LOG.info("{}: applySnapshot called", persistenceId()); delegate.applySnapshot(snapshot); } @@ -230,11 +253,6 @@ public class RaftActorTest extends AbstractActorTest { delegate.onStateChanged(); } - @Override - protected DataPersistenceProvider persistence() { - return this.dataPersistenceProvider; - } - @Override protected Optional getRoleChangeNotifier() { return Optional.fromNullable(roleChangeNotifier); @@ -270,7 +288,7 @@ public class RaftActorTest extends AbstractActorTest { } - private static class RaftActorTestKit extends JavaTestKit { + public static class RaftActorTestKit extends JavaTestKit { private final ActorRef raftActor; public RaftActorTestKit(ActorSystem actorSystem, String actorName) { @@ -306,7 +324,7 @@ public class RaftActorTest extends AbstractActorTest { waitUntilLeader(raftActor); } - protected void waitUntilLeader(ActorRef actorRef) { + public static void waitUntilLeader(ActorRef actorRef) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); for(int i = 0; i < 20 * 5; i++) { Future future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration)); @@ -376,8 +394,7 @@ public class RaftActorTest extends AbstractActorTest { Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1); - MockSnapshotStore.setMockSnapshot(snapshot); - MockSnapshotStore.setPersistenceId(persistenceId); + InMemorySnapshotStore.addSnapshot(persistenceId, snapshot); // add more entries after snapshot is taken List entries = new ArrayList<>(); @@ -394,11 +411,11 @@ public class RaftActorTest extends AbstractActorTest { int lastAppliedToState = 5; int lastIndex = 7; - MockAkkaJournal.addToJournal(5, entry2); + InMemoryJournal.addEntry(persistenceId, 5, entry2); // 2 entries are applied to state besides the 4 entries in snapshot - MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState)); - MockAkkaJournal.addToJournal(7, entry3); - MockAkkaJournal.addToJournal(8, entry4); + InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState)); + InMemoryJournal.addEntry(persistenceId, 7, entry3); + InMemoryJournal.addEntry(persistenceId, 8, entry4); // kill the actor followerActor.tell(PoisonPill.getInstance(), null); @@ -423,6 +440,46 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + // Setup the persisted journal with some entries + ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload("zero")); + ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("oen")); + ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + new MockRaftActorContext.MockPayload("two")); + + long seqNr = 1; + InMemoryJournal.addEntry(persistenceId, seqNr++, entry0); + InMemoryJournal.addEntry(persistenceId, seqNr++, entry1); + InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1)); + InMemoryJournal.addEntry(persistenceId, seqNr++, entry2); + + int lastAppliedToState = 1; + int lastIndex = 2; + + //reinstate the actor + TestActorRef leaderActor = factory.createTestActor( + MockRaftActor.props(persistenceId, Collections.emptyMap(), + Optional.of(config))); + + leaderActor.underlyingActor().waitForRecoveryComplete(); + + RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext(); + assertEquals("Journal log size", 3, context.getReplicatedLog().size()); + assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); + assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); + }}; + } + /** * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does * process recovery messages @@ -471,7 +528,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("add replicated log entry", 2, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1)); assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex()); @@ -481,7 +538,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("remove log entries", 1, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar")); assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); @@ -538,7 +595,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("add replicated log entry", 0, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1)); assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex()); @@ -546,13 +603,12 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("remove log entries", 0, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar")); assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); - }}; } @@ -576,12 +632,12 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar"); assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); - } - }; } @@ -602,14 +658,14 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)); mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry); verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class)); - } - }; } @@ -630,19 +686,21 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + + mockRaftActor.waitUntilLeader(); + mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); - verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); - + verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class)); } - }; } @Test - public void testApplyLogEntriesCallsDataPersistence() throws Exception { + public void testApplyJournalEntriesCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { String persistenceId = factory.generateActorId("leader-"); @@ -658,9 +716,13 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - mockRaftActor.onReceiveCommand(new ApplyLogEntries(10)); + mockRaftActor.waitForInitializeBehaviorComplete(); + + mockRaftActor.waitUntilLeader(); - verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); + mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); + + verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); } @@ -685,16 +747,20 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); - RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, -1, + new MockRaftActorContext.MockPayload("D")), -1); + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -718,15 +784,18 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + ImmutableMap.of("leader", "fake/path"), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(lastEntry); ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), @@ -737,7 +806,9 @@ public class RaftActorTest extends AbstractActorTest { RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1)); + long replicatedToAllIndex = 1; + + mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); verify(mockRaftActor.delegate).createSnapshot(); @@ -749,13 +820,16 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).deleteMessages(100); - assertEquals(2, mockRaftActor.getReplicatedLog().size()); + assertEquals(3, mockRaftActor.getReplicatedLog().size()); + assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex()); + assertNotNull(mockRaftActor.getReplicatedLog().get(2)); assertNotNull(mockRaftActor.getReplicatedLog().get(3)); assertNotNull(mockRaftActor.getReplicatedLog().get(4)); // Index 2 will not be in the log because it was removed due to snapshotting - assertNull(mockRaftActor.getReplicatedLog().get(2)); + assertNull(mockRaftActor.getReplicatedLog().get(1)); + assertNull(mockRaftActor.getReplicatedLog().get(0)); } }; @@ -779,6 +853,8 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); @@ -807,6 +883,8 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog(); oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); @@ -860,6 +938,8 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), @@ -870,7 +950,9 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("D")), 1); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -885,39 +967,121 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testRaftRoleChangeNotifier() throws Exception { + public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); + MessageCollectorActor.waitUntilReady(notifierActor); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + long heartBeatInterval = 100; + config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(20); + String persistenceId = factory.generateActorId("notifier-"); - factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), notifierActor, + new NonPersistentDataProvider()), persistenceId); - // sleeping for a minimum of 2 seconds, if it spans more its fine. - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); - List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); - assertNotNull(matches); - assertEquals(3, matches.size()); // check if the notifier got a role change from null to Follower - RoleChanged raftRoleChanged = (RoleChanged) matches.get(0); + RoleChanged raftRoleChanged = matches.get(0); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertNull(raftRoleChanged.getOldRole()); assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); // check if the notifier got a role change from Follower to Candidate - raftRoleChanged = (RoleChanged) matches.get(1); + raftRoleChanged = matches.get(1); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); // check if the notifier got a role change from Candidate to Leader - raftRoleChanged = (RoleChanged) matches.get(2); + raftRoleChanged = matches.get(2); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + + assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + + notifierActor.underlyingActor().clear(); + + MockRaftActor raftActor = raftActorRef.underlyingActor(); + final String newLeaderId = "new-leader"; + Follower follower = new Follower(raftActor.getRaftActorContext()) { + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + leaderId = newLeaderId; + return this; + } + }; + + raftActor.changeCurrentBehavior(follower); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(null, leaderStateChange.getLeaderId()); + + raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(newLeaderId, leaderStateChange.getLeaderId()); + }}; + } + + @Test + public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); + MessageCollectorActor.waitUntilReady(notifierActor); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + long heartBeatInterval = 100; + config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(1); + + String persistenceId = factory.generateActorId("notifier-"); + + factory.createActor(MockRaftActor.props(persistenceId, + ImmutableMap.of("leader", "fake/path"), Optional.of(config), notifierActor), persistenceId); + + List matches = null; + for(int i = 0; i < 5000 / heartBeatInterval; i++) { + matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + assertNotNull(matches); + if(matches.size() == 3) { + break; + } + Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); + } + + assertEquals(2, matches.size()); + + // check if the notifier got a role change from null to Follower + RoleChanged raftRoleChanged = matches.get(0); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertNull(raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Follower to Candidate + raftRoleChanged = matches.get(1); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + }}; } @@ -940,11 +1104,12 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = new HashMap<>(); peerAddresses.put(follower1Id, followerActor1.path().toString()); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), + TestActorRef mockActorRef = factory.createTestActor( MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(4); leaderActor.getRaftActorContext().setLastApplied(4); leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); @@ -962,8 +1127,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1)); - leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + leaderActor.getRaftActorContext().getSnapshotManager() + .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("x")), 4); + verify(leaderActor.delegate).createSnapshot(); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -989,8 +1156,14 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider() + , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); + + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -1029,7 +1202,7 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = new HashMap<>(); peerAddresses.put(leaderId, leaderActor1.path().toString()); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), + TestActorRef mockActorRef = factory.createTestActor( MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config), dataPersistenceProvider), persistenceId); @@ -1040,20 +1213,23 @@ public class RaftActorTest extends AbstractActorTest { followerActor.waitForInitializeBehaviorComplete(); - // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + Follower follower = new Follower(followerActor.getRaftActorContext()); followerActor.setCurrentBehavior(follower); assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); - // log as indices 0-5 + // log has indices 0-5 assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1)); - followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + followerActor.getRaftActorContext().getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("D")), 4); + verify(followerActor.delegate).createSnapshot(); assertEquals(6, followerActor.getReplicatedLog().size()); @@ -1088,9 +1264,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); - // capture snapshot reply should remove the snapshotted entries only + // The commit is needed to complete the snapshot creation process + followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); + + // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log assertEquals(7, followerActor.getReplicatedLog().lastIndex()); @@ -1150,16 +1329,22 @@ public class RaftActorTest extends AbstractActorTest { // create 5 entries in the log MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build()); + //set the snapshot index to 4 , 0 to 4 are snapshotted leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4); + //setting replicatedToAllIndex = 9, for the log to clear + leader.setReplicatedToAllIndex(9); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // set the 2nd follower nextIndex to 1 which has been snapshotted leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // simulate a real snapshot leaderActor.onReceiveCommand(new SendHeartBeat()); @@ -1180,9 +1365,9 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); - assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size()); + assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); //reply from a slow follower after should not raise errors leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1)); @@ -1191,6 +1376,96 @@ public class RaftActorTest extends AbstractActorTest { }; } + @Test + public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + for(int i=0;i< 4;i++) { + leaderActor.getReplicatedLog() + .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i, + new MockRaftActorContext.MockPayload("A"))); + } + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + assertEquals(-1, leader.getReplicatedToAllIndex()); + + }}; + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + leaderActor.getReplicatedLog().setSnapshotIndex(3); + + leaderActor.waitForInitializeBehaviorComplete(); + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + leader.setReplicatedToAllIndex(3); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + assertEquals(3, leader.getReplicatedToAllIndex()); + + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null;