package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
*/
public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorIntegrationTest {
- private List<ReplicatedLogImplEntry> origLeaderJournal;
+ private List<SimpleReplicatedLogEntry> origLeaderJournal;
private MockPayload recoveredPayload0;
private MockPayload recoveredPayload1;
private MockPayload payload7;
@Test
- public void runTest() throws Exception {
+ public void runTest() {
testLog.info("testReplicationAndSnapshots starting");
// Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
long seqId = 1;
InMemoryJournal.addEntry(leaderId, seqId++, new UpdateElectionTerm(initialTerm, leaderId));
recoveredPayload0 = new MockPayload("zero");
- InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(0, initialTerm, recoveredPayload0));
+ InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(0, initialTerm, recoveredPayload0));
recoveredPayload1 = new MockPayload("one");
- InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
+ InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(1, initialTerm, recoveredPayload1));
recoveredPayload2 = new MockPayload("two");
- InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
+ InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(2, initialTerm, recoveredPayload2));
InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
- origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
+ origLeaderJournal = InMemoryJournal.get(leaderId, SimpleReplicatedLogEntry.class);
// Create the leader and 2 follower actors and verify initial syncing of the followers after leader
// persistence recovery.
+ DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
+ followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
- follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
+ follower2Id, testActorPath(follower2Id)), followerConfigParams);
follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
- follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
+ follower1Id, testActorPath(follower1Id)), followerConfigParams);
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).
- put(follower2Id, follower2Actor.path().toString()).build();
+ peerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString())
+ .put(follower2Id, follower2Actor.path().toString()).build();
leaderConfigParams = newLeaderConfigParams();
leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
* 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
* scenario, the follower consensus and application of state is delayed until after the snapshot
* completes.
- * @throws Exception
*/
- private void testFirstSnapshot() throws Exception {
+ private void testFirstSnapshot() {
testLog.info("testFirstSnapshot starting");
expSnapshotState.add(recoveredPayload0);
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
// The leader's persisted journal log should be cleared since we snapshotted.
- List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
+ List<SimpleReplicatedLogEntry> persistedLeaderJournal =
+ InMemoryJournal.get(leaderId, SimpleReplicatedLogEntry.class);
assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
// Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a
assertEquals("Leader last applied", 3, leaderContext.getLastApplied());
assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex());
+ // The followers should also snapshot so verify.
+
+ MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SaveSnapshotSuccess.class);
+ persistedSnapshots = InMemorySnapshotStore.getSnapshots(follower1Id, Snapshot.class);
+ assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
+ // The last applied index in the snapshot may or may not be the last log entry depending on
+ // timing so to avoid intermittent test failures, we'll just verify the snapshot's last term/index.
+ assertEquals("Follower1 Snapshot getLastTerm", currentTerm, persistedSnapshots.get(0).getLastTerm());
+ assertEquals("Follower1 Snapshot getLastIndex", 3, persistedSnapshots.get(0).getLastIndex());
+
+ MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
+
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
/**
* Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
* consensus occurs and the leader applies the state.
- * @throws Exception
*/
- private void testSecondSnapshot() throws Exception {
+ private void testSecondSnapshot() {
testLog.info("testSecondSnapshot starting");
expSnapshotState.add(payload3);
payload7 = sendPayloadData(leaderActor, "seven");
// Capture the CaptureSnapshotReply message so we can send it later.
- CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
- CaptureSnapshotReply.class);
+ final CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
+ leaderCollectorActor, CaptureSnapshotReply.class);
// Wait for the state to be applied in the leader.
ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
// The leader's persisted journal log should be cleared since we did a snapshot.
- List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(
- leaderId, ReplicatedLogImplEntry.class);
+ List<SimpleReplicatedLogEntry> persistedLeaderJournal = InMemoryJournal.get(
+ leaderId, SimpleReplicatedLogEntry.class);
assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
// Verify the followers apply all 4 new log entries.
- List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 4);
+ List<ApplyState> applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor,
+ ApplyState.class, 4);
verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4);
verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5);
verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
- RaftActorContext follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
+ follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm());
assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex());
assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
MessageCollectorActor.clearMessages(follower2CollectorActor);
MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class);
- RaftActorContext follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+ follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm());
assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex());
assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size());