import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
MockAkkaJournal.addToJournal(5, entry2);
// 2 entries are applied to state besides the 4 entries in snapshot
- MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+ MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
MockAkkaJournal.addToJournal(7, entry3);
MockAkkaJournal.addToJournal(8, entry4);
+
// kill the actor
followerActor.tell(PoisonPill.getInstance(), null);
expectMsgClass(duration("5 seconds"), Terminated.class);
}};
}
+ @Test
+ public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("leader-");
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ // Setup the persisted journal with some entries
+ ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ new MockRaftActorContext.MockPayload("zero"));
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ new MockRaftActorContext.MockPayload("oen"));
+ ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
+ new MockRaftActorContext.MockPayload("two"));
+
+ long seqNr = 1;
+ MockAkkaJournal.addToJournal(seqNr++, entry0);
+ MockAkkaJournal.addToJournal(seqNr++, entry1);
+ MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
+ MockAkkaJournal.addToJournal(seqNr++, entry2);
+
+ int lastAppliedToState = 1;
+ int lastIndex = 2;
+
+ //reinstate the actor
+ TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
+ MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
+ Optional.<ConfigParams>of(config)));
+
+ leaderActor.underlyingActor().waitForRecoveryComplete();
+
+ RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
+ assertEquals("Journal log size", 3, context.getReplicatedLog().size());
+ assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+ assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+ }};
+ }
+
/**
* This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
* process recovery messages
assertEquals("add replicated log entry", 2, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+ mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
assertEquals("add replicated log entry", 0, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+ mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
}
@Test
- public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+ public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
new JavaTestKit(getSystem()) {
{
String persistenceId = factory.generateActorId("leader-");
mockRaftActor.waitForInitializeBehaviorComplete();
- mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+ mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));