import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
-import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
@After
public void tearDown() throws Exception {
factory.close();
- MockAkkaJournal.clearJournal();
- MockSnapshotStore.setMockSnapshot(null);
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
public static class MockRaftActor extends RaftActor {
- private final DataPersistenceProvider dataPersistenceProvider;
+ protected DataPersistenceProvider dataPersistenceProvider;
private final RaftActor delegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
delegate.applyState(clientActor, identifier, data);
- LOG.info("applyState called");
+ LOG.info("{}: applyState called", persistenceId());
}
@Override
}
@Override protected void createSnapshot() {
+ LOG.info("{}: createSnapshot called", persistenceId());
delegate.createSnapshot();
}
@Override protected void applySnapshot(byte [] snapshot) {
+ LOG.info("{}: applySnapshot called", persistenceId());
delegate.applySnapshot(snapshot);
}
}
- private static class RaftActorTestKit extends JavaTestKit {
+ public static class RaftActorTestKit extends JavaTestKit {
private final ActorRef raftActor;
public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
waitUntilLeader(raftActor);
}
- protected void waitUntilLeader(ActorRef actorRef) {
+ public static void waitUntilLeader(ActorRef actorRef) {
FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 20 * 5; i++) {
Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
lastAppliedDuringSnapshotCapture, 1);
- MockSnapshotStore.setMockSnapshot(snapshot);
- MockSnapshotStore.setPersistenceId(persistenceId);
+ InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
// add more entries after snapshot is taken
List<ReplicatedLogEntry> entries = new ArrayList<>();
int lastAppliedToState = 5;
int lastIndex = 7;
- MockAkkaJournal.addToJournal(5, entry2);
+ InMemoryJournal.addEntry(persistenceId, 5, entry2);
// 2 entries are applied to state besides the 4 entries in snapshot
- MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
- MockAkkaJournal.addToJournal(7, entry3);
- MockAkkaJournal.addToJournal(8, entry4);
+ InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
+ InMemoryJournal.addEntry(persistenceId, 7, entry3);
+ InMemoryJournal.addEntry(persistenceId, 8, entry4);
// kill the actor
followerActor.tell(PoisonPill.getInstance(), null);
}};
}
+ @Test
+ public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("leader-");
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ // Setup the persisted journal with some entries
+ ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ new MockRaftActorContext.MockPayload("zero"));
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ new MockRaftActorContext.MockPayload("oen"));
+ ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
+ new MockRaftActorContext.MockPayload("two"));
+
+ long seqNr = 1;
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
+ InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
+
+ int lastAppliedToState = 1;
+ int lastIndex = 2;
+
+ //reinstate the actor
+ TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
+ MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
+ Optional.<ConfigParams>of(config)));
+
+ leaderActor.underlyingActor().waitForRecoveryComplete();
+
+ RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
+ assertEquals("Journal log size", 3, context.getReplicatedLog().size());
+ assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+ assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+ }};
+ }
+
/**
* This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
* process recovery messages
assertEquals("add replicated log entry", 2, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+ mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
assertEquals("add replicated log entry", 0, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+ mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
}
@Test
- public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+ public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
new JavaTestKit(getSystem()) {
{
String persistenceId = factory.generateActorId("leader-");
mockRaftActor.waitForInitializeBehaviorComplete();
- mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+ mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));