- new Within(duration("1 seconds")) {
- protected void run() {
-
- String persistenceId = "follower10";
-
- ActorRef followerActor = getSystem().actorOf(
- MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
-
- List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
- ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
- snapshotUnappliedEntries.add(entry1);
-
- int lastAppliedDuringSnapshotCapture = 3;
- int lastIndexDuringSnapshotCapture = 4;
-
- ByteString snapshotBytes = null;
- try {
- // 4 messages as part of snapshot, which are applied to state
- snapshotBytes = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
- } catch (Exception e) {
- e.printStackTrace();
- }
- Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
- snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
- lastAppliedDuringSnapshotCapture, 1);
- MockSnapshotStore.setMockSnapshot(snapshot);
- MockSnapshotStore.setPersistenceId(persistenceId);
-
- // add more entries after snapshot is taken
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
- ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
- ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
- entries.add(entry2);
- entries.add(entry3);
- entries.add(entry4);
-
- int lastAppliedToState = 5;
- int lastIndex = 7;
-
- MockAkkaJournal.addToJournal(5, entry2);
- // 2 entries are applied to state besides the 4 entries in snapshot
- MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
- MockAkkaJournal.addToJournal(7, entry3);
- MockAkkaJournal.addToJournal(8, entry4);
-
- // kill the actor
- followerActor.tell(PoisonPill.getInstance(), null);
-
- try {
- // give some time for actor to die
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- //reinstate the actor
- TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
- MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
-
- try {
- //give some time for snapshot offer to get called.
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- RaftActorContext context = ref.underlyingActor().getRaftActorContext();
- assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
- assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
- assertEquals(lastAppliedToState, context.getLastApplied());
- assertEquals(lastAppliedToState, context.getCommitIndex());
- assertTrue(ref.underlyingActor().isApplySnapshotCalled());
- assertEquals(6, ref.underlyingActor().getState().size());
- }
- };
+ String persistenceId = "follower10";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ // Set the heartbeat interval high to essentially disable election otherwise the test
+ // may fail if the actor is switched to Leader and the commitIndex is set to the last
+ // log entry.
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+ watch(followerActor);
+
+ List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+ new MockRaftActorContext.MockPayload("E"));
+ snapshotUnappliedEntries.add(entry1);
+
+ int lastAppliedDuringSnapshotCapture = 3;
+ int lastIndexDuringSnapshotCapture = 4;
+
+ // 4 messages as part of snapshot, which are applied to state
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+
+ Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+ snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+ lastAppliedDuringSnapshotCapture, 1);
+ MockSnapshotStore.setMockSnapshot(snapshot);
+ MockSnapshotStore.setPersistenceId(persistenceId);
+
+ // add more entries after snapshot is taken
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("F"));
+ ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("G"));
+ ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+ new MockRaftActorContext.MockPayload("H"));
+ entries.add(entry2);
+ entries.add(entry3);
+ entries.add(entry4);
+
+ int lastAppliedToState = 5;
+ int lastIndex = 7;
+
+ MockAkkaJournal.addToJournal(5, entry2);
+ // 2 entries are applied to state besides the 4 entries in snapshot
+ MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+ MockAkkaJournal.addToJournal(7, entry3);
+ MockAkkaJournal.addToJournal(8, entry4);
+
+ // kill the actor
+ followerActor.tell(PoisonPill.getInstance(), null);
+ expectMsgClass(duration("5 seconds"), Terminated.class);
+
+ unwatch(followerActor);
+
+ //reinstate the actor
+ TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
+ Optional.<ConfigParams>of(config)));
+
+ ref.underlyingActor().waitForRecoveryComplete();
+
+ RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+ assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
+ context.getReplicatedLog().size());
+ assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+ assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+ assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());