- public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
- String follower1Id = factory.generateActorId("follower-");
- String follower2Id = factory.generateActorId("follower-");
-
- ActorRef followerActor1 =
- factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
- ActorRef followerActor2 =
- factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(follower1Id, followerActor1.path().toString());
- peerAddresses.put(follower2Id, followerActor2.path().toString());
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses,
- Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
- MockRaftActor leaderActor = mockActorRef.underlyingActor();
- leaderActor.getRaftActorContext().setCommitIndex(9);
- leaderActor.getRaftActorContext().setLastApplied(9);
- leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
-
- leaderActor.waitForInitializeBehaviorComplete();
-
- Leader leader = new Leader(leaderActor.getRaftActorContext());
- leaderActor.setCurrentBehavior(leader);
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // create 5 entries in the log
- MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
- leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
-
- //set the snapshot index to 4 , 0 to 4 are snapshotted
- leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
- //setting replicatedToAllIndex = 9, for the log to clear
- leader.setReplicatedToAllIndex(9);
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // set the 2nd follower nextIndex to 1 which has been snapshotted
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // simulate a real snapshot
- leaderActor.onReceiveCommand(new SendHeartBeat());
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
- leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
- , RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
-
- //reply from a slow follower does not initiate a fake snapshot
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
- assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
-
- ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("foo-0"),
- new MockRaftActorContext.MockPayload("foo-1"),
- new MockRaftActorContext.MockPayload("foo-2"),
- new MockRaftActorContext.MockPayload("foo-3"),
- new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
-
- assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
-
- //reply from a slow follower after should not raise errors
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
- assertEquals(0, leaderActor.getReplicatedLog().size());
- }
- };
+ public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
+ TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
+
+ String persistenceId = factory.generateActorId("test-actor-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
+ Snapshot snapshot = Snapshot.create(ByteState.of(fromObject(state).toByteArray()),
+ Arrays.<ReplicatedLogEntry>asList(), 5, 2, 5, 2, 2, "member-1", null);
+
+ InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1,
+ new MockRaftActorContext.MockPayload("B")));
+
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
+ .config(config).restoreFromSnapshot(snapshot).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+ mockRaftActor.waitForRecoveryComplete();
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(Snapshot.State.class));
+
+ RaftActorContext context = mockRaftActor.getRaftActorContext();
+ assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+ assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", -1, context.getLastApplied());
+ assertEquals("Commit index", -1, context.getCommitIndex());
+ assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
+ assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
+
+ TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");