X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=48b555faff286c0393cb2c6ec39030c651751822;hp=941deb5843e2749a08232ba8b1cf5a55921419fb;hb=93e6f3bfc003d4ce2d968761dff963615a0b799d;hpb=f1c3050779d7770ef6a12a67a1870765c3dfd9eb diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 941deb5843..48b555faff 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -62,13 +62,15 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; @@ -83,6 +85,7 @@ import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolic import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -134,7 +137,7 @@ public class RaftActorTest extends AbstractActorTest { ImmutableMap peerAddresses = ImmutableMap.builder().put("member1", "address").build(); ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId, - peerAddresses, Optional.of(config)), persistenceId); + peerAddresses, config), persistenceId); watch(followerActor); @@ -187,7 +190,7 @@ public class RaftActorTest extends AbstractActorTest { //reinstate the actor TestActorRef ref = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config))); + MockRaftActor.props(persistenceId, peerAddresses, config)); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -221,7 +224,7 @@ public class RaftActorTest extends AbstractActorTest { TestActorRef ref = factory.createTestActor(MockRaftActor.props(persistenceId, ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config), new NonPersistentDataProvider()), persistenceId); + config, new NonPersistentDataProvider()), persistenceId); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -245,7 +248,7 @@ public class RaftActorTest extends AbstractActorTest { TestActorRef ref = factory.createTestActor(MockRaftActor.props(persistenceId, ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config), new NonPersistentDataProvider()). + config, new NonPersistentDataProvider()). withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); InMemoryJournal.waitForWriteMessagesComplete(persistenceId); @@ -257,8 +260,8 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); ref = factory.createTestActor(MockRaftActor.props(persistenceId, - ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config), new NonPersistentDataProvider()). + ImmutableMap.builder().put("member1", "address").build(), config, + new NonPersistentDataProvider()). withDispatcher(Dispatchers.DefaultDispatcherId()), factory.generateActorId("follower-")); @@ -284,7 +287,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config)), persistenceId); + Collections.emptyMap(), config), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -305,9 +308,6 @@ public class RaftActorTest extends AbstractActorTest { ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2); mockRaftActor.handleRecover(applyJournalEntries); - ApplyLogEntries applyLogEntries = new ApplyLogEntries(0); - mockRaftActor.handleRecover(applyLogEntries); - DeleteEntries deleteEntries = new DeleteEntries(1); mockRaftActor.handleRecover(deleteEntries); @@ -325,7 +325,6 @@ public class RaftActorTest extends AbstractActorTest { verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class)); - verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class)); @@ -342,8 +341,8 @@ public class RaftActorTest extends AbstractActorTest { RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class); - TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), mockSupport), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).snapshotMessageSupport(mockSupport).props()); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -362,11 +361,11 @@ public class RaftActorTest extends AbstractActorTest { doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class)); mockRaftActor.handleCommand(captureSnapshotReply); - SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class)); + SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L)); doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class)); mockRaftActor.handleCommand(saveSnapshotSuccess); - SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable()); + SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable()); doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class)); mockRaftActor.handleCommand(saveSnapshotFailure); @@ -400,7 +399,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), config, dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -431,7 +430,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), config, dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -440,9 +439,10 @@ public class RaftActorTest extends AbstractActorTest { ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); - mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry)); + final Identifier id = new MockIdentifier("apply-state"); + mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry)); - verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); + verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject()); } }; @@ -462,9 +462,10 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), notifierActor, - new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider( + new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + persistenceId); List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); @@ -501,7 +502,7 @@ public class RaftActorTest extends AbstractActorTest { Follower follower = new Follower(raftActor.getRaftActorContext()) { @Override public RaftActorBehavior handleMessage(ActorRef sender, Object message) { - leaderId = newLeaderId; + setLeaderId(newLeaderId); setLeaderPayloadVersion(newLeaderVersion); return this; } @@ -549,8 +550,9 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - factory.createActor(MockRaftActor.props(persistenceId, - ImmutableMap.of("leader", "fake/path"), Optional.of(config), notifierActor), persistenceId); + factory.createActor(MockRaftActor.builder().id(persistenceId). + peerAddresses(ImmutableMap.of("leader", "fake/path")). + config(config).roleChangeNotifier(notifierActor).props()); List matches = null; for(int i = 0; i < 5000 / heartBeatInterval; i++) { @@ -600,8 +602,7 @@ public class RaftActorTest extends AbstractActorTest { peerAddresses.put(follower1Id, followerActor1.path().toString()); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); @@ -653,12 +654,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(), - leader, Runtime.getRuntime().totalMemory()); + Runtime.getRuntime().totalMemory()); assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader); + leaderActor.getRaftActorContext().getSnapshotManager().commit(-1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -698,8 +699,7 @@ public class RaftActorTest extends AbstractActorTest { peerAddresses.put(leaderId, leaderActor1.path().toString()); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor followerActor = mockActorRef.underlyingActor(); followerActor.getRaftActorContext().setCommitIndex(4); @@ -762,7 +762,7 @@ public class RaftActorTest extends AbstractActorTest { assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower); + followerActor.getRaftActorContext().getSnapshotManager().commit(-1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -807,8 +807,7 @@ public class RaftActorTest extends AbstractActorTest { peerAddresses.put(follower2Id, followerActor2.path().toString()); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); leaderActor.getRaftActorContext().setCommitIndex(9); @@ -842,7 +841,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // simulate a real snapshot - leaderActor.onReceiveCommand(new SendHeartBeat()); + leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE); assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ", leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()) @@ -885,8 +884,7 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = ImmutableMap.builder().put("member1", "address").build(); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); leaderActor.getRaftActorContext().setCommitIndex(3); @@ -933,8 +931,7 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = ImmutableMap.builder().put("member1", "address").build(); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); leaderActor.getRaftActorContext().setCommitIndex(3); @@ -949,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // Persist another entry (this will cause a CaptureSnapshot to be triggered - leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + leaderActor.persistData(mockActorRef, new MockIdentifier("x"), + new MockRaftActorContext.MockPayload("duh")); // Now send a CaptureSnapshotReply mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); @@ -984,7 +982,7 @@ public class RaftActorTest extends AbstractActorTest { InMemoryJournal.addEntry(persistenceId, 1, replLogEntry); TestActorRef ref = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config))); + MockRaftActor.props(persistenceId, peerAddresses, config)); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -1010,8 +1008,7 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = ImmutableMap.builder().build(); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); @@ -1067,8 +1064,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef actorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(emptyConfig), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = actorRef.underlyingActor(); mockRaftActor.waitForInitializeBehaviorComplete(); @@ -1129,7 +1125,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"))); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - ImmutableMap.builder().put("member1", "address").build(), Optional.of(config)). + ImmutableMap.builder().put("member1", "address").build(), config). withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); @@ -1195,4 +1191,180 @@ public class RaftActorTest extends AbstractActorTest { TEST_LOG.info("testGetSnapshot ending"); } + + @Test + public void testRestoreFromSnapshot() throws Exception { + TEST_LOG.info("testRestoreFromSnapshot starting"); + + String persistenceId = factory.generateActorId("test-actor-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + List snapshotUnappliedEntries = new ArrayList<>(); + snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, + new MockRaftActorContext.MockPayload("E"))); + + int snapshotLastApplied = 3; + int snapshotLastIndex = 4; + + List state = Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D")); + ByteString stateBytes = fromObject(state); + + Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries, + snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1"); + + InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForRecoveryComplete(); + + Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class); + assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm()); + assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor()); + assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex()); + assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm()); + assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex()); + assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm()); + assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState()); + assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries()); + + verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class)); + + RaftActorContext context = mockRaftActor.getRaftActorContext(); + assertEquals("Journal log size", 1, context.getReplicatedLog().size()); + assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", snapshotLastApplied, context.getLastApplied()); + assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex()); + assertEquals("Recovered state", state, mockRaftActor.getState()); + assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm()); + assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor()); + + // Test with data persistence disabled + + snapshot = Snapshot.create(new byte[0], Collections.emptyList(), + -1, -1, -1, -1, 5, "member-1"); + + persistenceId = factory.generateActorId("test-actor-"); + + raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)). + persistent(Optional.of(Boolean.FALSE)).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForRecoveryComplete(); + assertEquals("snapshot committed", true, + Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS)); + + context = mockRaftActor.getRaftActorContext(); + assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm()); + assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor()); + + TEST_LOG.info("testRestoreFromSnapshot ending"); + } + + @Test + public void testRestoreFromSnapshotWithRecoveredData() throws Exception { + TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting"); + + String persistenceId = factory.generateActorId("test-actor-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + List state = Arrays.asList(new MockRaftActorContext.MockPayload("A")); + Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.asList(), + 5, 2, 5, 2, 2, "member-1"); + + InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload("B"))); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForRecoveryComplete(); + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class)); + + RaftActorContext context = mockRaftActor.getRaftActorContext(); + assertEquals("Journal log size", 1, context.getReplicatedLog().size()); + assertEquals("Last index", 0, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", -1, context.getLastApplied()); + assertEquals("Commit index", -1, context.getCommitIndex()); + assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm()); + assertEquals("Voted for", null, context.getTermInformation().getVotedFor()); + + TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending"); + } + + @Test + public void testNonVotingOnRecovery() throws Exception { + TEST_LOG.info("testNonVotingOnRecovery starting"); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setElectionTimeoutFactor(1); + config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS)); + + String persistenceId = factory.generateActorId("test-actor-"); + InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false))))); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForInitializeBehaviorComplete(); + + // Sleep a bit and verify it didn't get an election timeout and schedule an election. + + Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS); + assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState()); + + TEST_LOG.info("testNonVotingOnRecovery ending"); + } + + @Test + public void testLeaderTransitioning() throws Exception { + TEST_LOG.info("testLeaderTransitioning starting"); + + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + String persistenceId = factory.generateActorId("test-actor-"); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForInitializeBehaviorComplete(); + + raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.emptyList(), + 0L, -1L, (short)1), ActorRef.noSender()); + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId()); + + MessageCollectorActor.clearMessages(notifierActor); + + raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender()); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId()); + assertEquals("getLeaderId", null, leaderStateChange.getLeaderId()); + + TEST_LOG.info("testLeaderTransitioning ending"); + } }