X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=c0bdc53c51f27907da2de0e2399d619cee8fdda1;hp=83868b6a2ad187a257bc3323479b8d84974276a5;hb=f78b7f15e24efdb5dd9f91b487bc63dad7517b1c;hpb=8f260b29ae2a1b6ee75712f74e44996a3a3b02f0 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 83868b6a2a..c0bdc53c51 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -55,6 +55,7 @@ import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -396,10 +397,11 @@ public class RaftActorTest extends AbstractActorTest { MockAkkaJournal.addToJournal(5, entry2); // 2 entries are applied to state besides the 4 entries in snapshot - MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState)); + MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState)); MockAkkaJournal.addToJournal(7, entry3); MockAkkaJournal.addToJournal(8, entry4); + // kill the actor followerActor.tell(PoisonPill.getInstance(), null); expectMsgClass(duration("5 seconds"), Terminated.class); @@ -423,6 +425,46 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + // Setup the persisted journal with some entries + ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload("zero")); + ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("oen")); + ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + new MockRaftActorContext.MockPayload("two")); + + long seqNr = 1; + MockAkkaJournal.addToJournal(seqNr++, entry0); + MockAkkaJournal.addToJournal(seqNr++, entry1); + MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1)); + MockAkkaJournal.addToJournal(seqNr++, entry2); + + int lastAppliedToState = 1; + int lastIndex = 2; + + //reinstate the actor + TestActorRef leaderActor = factory.createTestActor( + MockRaftActor.props(persistenceId, Collections.emptyMap(), + Optional.of(config))); + + leaderActor.underlyingActor().waitForRecoveryComplete(); + + RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext(); + assertEquals("Journal log size", 3, context.getReplicatedLog().size()); + assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); + assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); + }}; + } + /** * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does * process recovery messages @@ -471,7 +513,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("add replicated log entry", 2, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1)); assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex()); @@ -538,7 +580,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("add replicated log entry", 0, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1)); assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex()); @@ -641,7 +683,7 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testApplyLogEntriesCallsDataPersistence() throws Exception { + public void testApplyJournalEntriesCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { String persistenceId = factory.generateActorId("leader-"); @@ -659,7 +701,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); - mockRaftActor.onReceiveCommand(new ApplyLogEntries(10)); + mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); @@ -1227,6 +1269,128 @@ public class RaftActorTest extends AbstractActorTest { }; } + + private static class NonPersistentProvider implements DataPersistenceProvider { + @Override + public boolean isRecoveryApplicable() { + return false; + } + + @Override + public void persist(T o, Procedure procedure) { + try { + procedure.apply(o); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void saveSnapshot(Object o) { + + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + + } + + @Override + public void deleteMessages(long sequenceNumber) { + + } + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + for(int i=0;i< 4;i++) { + leaderActor.getReplicatedLog() + .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i, + new MockRaftActorContext.MockPayload("A"))); + } + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertEquals(-1, leader.getReplicatedToAllIndex()); + + }}; + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + leaderActor.getReplicatedLog().setSnapshotIndex(3); + + leaderActor.waitForInitializeBehaviorComplete(); + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + leader.setReplicatedToAllIndex(3); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertEquals(3, leader.getReplicatedToAllIndex()); + + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null;