X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=34932c7249a38f753856cdb99addf72d221f0722;hp=56bfc21f23c2047a19d5a49c49861ae1bf6eca5e;hb=08dd5c2c443ff53f56af88a0e8dc8f34e36d2245;hpb=32b25203819eb02df22abfecdcc86896c068f778 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 56bfc21f23..34932c7249 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -54,7 +54,9 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -63,14 +65,15 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; -import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -88,13 +91,13 @@ public class RaftActorTest extends AbstractActorTest { @After public void tearDown() throws Exception { factory.close(); - MockAkkaJournal.clearJournal(); - MockSnapshotStore.setMockSnapshot(null); + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } public static class MockRaftActor extends RaftActor { - private final DataPersistenceProvider dataPersistenceProvider; + protected DataPersistenceProvider dataPersistenceProvider; private final RaftActor delegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; @@ -177,7 +180,7 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); - LOG.info("applyState called"); + LOG.info("{}: applyState called", persistenceId()); } @Override @@ -219,10 +222,12 @@ public class RaftActorTest extends AbstractActorTest { } @Override protected void createSnapshot() { + LOG.info("{}: createSnapshot called", persistenceId()); delegate.createSnapshot(); } @Override protected void applySnapshot(byte [] snapshot) { + LOG.info("{}: applySnapshot called", persistenceId()); delegate.applySnapshot(snapshot); } @@ -270,7 +275,7 @@ public class RaftActorTest extends AbstractActorTest { } - private static class RaftActorTestKit extends JavaTestKit { + public static class RaftActorTestKit extends JavaTestKit { private final ActorRef raftActor; public RaftActorTestKit(ActorSystem actorSystem, String actorName) { @@ -306,7 +311,7 @@ public class RaftActorTest extends AbstractActorTest { waitUntilLeader(raftActor); } - protected void waitUntilLeader(ActorRef actorRef) { + public static void waitUntilLeader(ActorRef actorRef) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); for(int i = 0; i < 20 * 5; i++) { Future future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration)); @@ -376,8 +381,7 @@ public class RaftActorTest extends AbstractActorTest { Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1); - MockSnapshotStore.setMockSnapshot(snapshot); - MockSnapshotStore.setPersistenceId(persistenceId); + InMemorySnapshotStore.addSnapshot(persistenceId, snapshot); // add more entries after snapshot is taken List entries = new ArrayList<>(); @@ -394,11 +398,11 @@ public class RaftActorTest extends AbstractActorTest { int lastAppliedToState = 5; int lastIndex = 7; - MockAkkaJournal.addToJournal(5, entry2); + InMemoryJournal.addEntry(persistenceId, 5, entry2); // 2 entries are applied to state besides the 4 entries in snapshot - MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState)); - MockAkkaJournal.addToJournal(7, entry3); - MockAkkaJournal.addToJournal(8, entry4); + InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState)); + InMemoryJournal.addEntry(persistenceId, 7, entry3); + InMemoryJournal.addEntry(persistenceId, 8, entry4); // kill the actor followerActor.tell(PoisonPill.getInstance(), null); @@ -423,6 +427,46 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + // Setup the persisted journal with some entries + ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload("zero")); + ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("oen")); + ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + new MockRaftActorContext.MockPayload("two")); + + long seqNr = 1; + InMemoryJournal.addEntry(persistenceId, seqNr++, entry0); + InMemoryJournal.addEntry(persistenceId, seqNr++, entry1); + InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1)); + InMemoryJournal.addEntry(persistenceId, seqNr++, entry2); + + int lastAppliedToState = 1; + int lastIndex = 2; + + //reinstate the actor + TestActorRef leaderActor = factory.createTestActor( + MockRaftActor.props(persistenceId, Collections.emptyMap(), + Optional.of(config))); + + leaderActor.underlyingActor().waitForRecoveryComplete(); + + RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext(); + assertEquals("Journal log size", 3, context.getReplicatedLog().size()); + assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); + assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); + }}; + } + /** * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does * process recovery messages @@ -471,7 +515,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("add replicated log entry", 2, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1)); assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex()); @@ -538,7 +582,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("add replicated log entry", 0, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1)); assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex()); @@ -641,7 +685,7 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testApplyLogEntriesCallsDataPersistence() throws Exception { + public void testApplyJournalEntriesCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { String persistenceId = factory.generateActorId("leader-"); @@ -659,7 +703,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); - mockRaftActor.onReceiveCommand(new ApplyLogEntries(10)); + mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); @@ -902,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftRoleChangeNotifier() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); MessageCollectorActor.waitUntilReady(notifierActor); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -912,20 +957,10 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - factory.createTestActor(MockRaftActor.props(persistenceId, + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); - List matches = null; - for(int i = 0; i < 5000 / heartBeatInterval; i++) { - matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); - assertNotNull(matches); - if(matches.size() == 3) { - break; - } - Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); - } - - assertEquals(3, matches.size()); + List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); // check if the notifier got a role change from null to Follower RoleChanged raftRoleChanged = matches.get(0); @@ -944,6 +979,41 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + + assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + + notifierActor.underlyingActor().clear(); + + MockRaftActor raftActor = raftActorRef.underlyingActor(); + final String newLeaderId = "new-leader"; + Follower follower = new Follower(raftActor.getRaftActorContext()) { + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + leaderId = newLeaderId; + return this; + } + }; + + raftActor.changeCurrentBehavior(follower); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(null, leaderStateChange.getLeaderId()); + + raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(newLeaderId, leaderStateChange.getLeaderId()); }}; }