import java.util.List;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
// Create the leader and 2 follower actors and verify initial syncing of the followers after leader
// persistence recovery.
+ DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
+ followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
- follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
+ follower2Id, testActorPath(follower2Id)), followerConfigParams);
follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
- follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
+ follower1Id, testActorPath(follower1Id)), followerConfigParams);
peerAddresses = ImmutableMap.<String, String>builder().
put(follower1Id, follower1Actor.path().toString()).
assertEquals("Leader last applied", 3, leaderContext.getLastApplied());
assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex());
+ // The followers should also snapshot so verify.
+
+ MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SaveSnapshotSuccess.class);
+ persistedSnapshots = InMemorySnapshotStore.getSnapshots(follower1Id, Snapshot.class);
+ assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
+ // The last applied index in the snapshot may or may not be the last log entry depending on
+ // timing so to avoid intermittent test failures, we'll just verify the snapshot's last term/index.
+ assertEquals("Follower1 Snapshot getLastTerm", currentTerm, persistedSnapshots.get(0).getLastTerm());
+ assertEquals("Follower1 Snapshot getLastIndex", 3, persistedSnapshots.get(0).getLastIndex());
+
+ MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
+
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
expSnapshotState.add(payload6);
// Delay the CaptureSnapshot message to the leader actor.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
// Send the payload.
payload7 = sendPayloadData(leaderActor, "seven");
- // Capture the CaptureSnapshot message so we can send it later.
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
+ // Capture the CaptureSnapshotReply message so we can send it later.
+ CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
+ CaptureSnapshotReply.class);
// Wait for the state to be applied in the leader.
ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
- // Now deliver the CaptureSnapshot.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
- // Wait for CaptureSnapshotReply to complete.
- MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
+ // Now deliver the CaptureSnapshotReply.
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+ leaderActor.tell(captureSnapshotReply, leaderActor);
// Wait for snapshot complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
- expSnapshotState.add(payload7);
-
// Verify the persisted snapshot. This should reflect the snapshot index as the last applied
// log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
// when the snapshot is created (ie when the CaptureSnapshot is processed).
assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
+ expSnapshotState.add(payload7);
+
testLog.info("testSecondSnapshot ending");
}