Follow-up patch for https://git.opendaylight.org/gerrit/#/c/21904/ to
use the captured Snapshot's lastAppliedIndex for the lastIncludedIndex
field in the InstallSnapshot message and the follower's matchIndex/nextIndex
once the install completes. This is in lieu of using the leader's snapshotIndex
which typically lags behind the lastAppliedIndex by 1 due to the trimming of
the in-memory log. This avoids the leader sending its last log entry
redundantly after the install completes as the last entry was included
in the snapshot.
Change-Id: Ie821078b4316641b67e1b853b9264353dde6bfae
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit
1dfb0b9105e9eb352ff2263434e79a5433e59e91)
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
import java.util.List;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import java.util.List;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
- Snapshot sn = Snapshot.create(snapshotBytes,
+ Snapshot snapshot = Snapshot.create(snapshotBytes,
captureSnapshot.getUnAppliedEntries(),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
captureSnapshot.getUnAppliedEntries(),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
- context.getPersistenceProvider().saveSnapshot(sn);
+ context.getPersistenceProvider().saveSnapshot(snapshot);
- LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), snapshot.getLogMessage());
long dataThreshold = totalMemory *
context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
long dataThreshold = totalMemory *
context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
if (context.getId().equals(currentBehavior.getLeaderId())
&& captureSnapshot.isInstallSnapshotInitiated()) {
// this would be call straight to the leader and won't initiate in serialization
if (context.getId().equals(currentBehavior.getLeaderId())
&& captureSnapshot.isInstallSnapshotInitiated()) {
// this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
- ByteString.copyFrom(snapshotBytes)));
+ currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot));
}
captureSnapshot = null;
}
captureSnapshot = null;
package org.opendaylight.controller.cluster.raft.base.messages;
package org.opendaylight.controller.cluster.raft.base.messages;
-import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.Snapshot;
public class SendInstallSnapshot {
public class SendInstallSnapshot {
- private ByteString snapshot;
+ private final Snapshot snapshot;
- public SendInstallSnapshot(ByteString snapshot) {
+ public SendInstallSnapshot(Snapshot snapshot) {
this.snapshot = snapshot;
}
this.snapshot = snapshot;
}
- public ByteString getSnapshot() {
+ public Snapshot getSnapshot() {
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
protected final int minIsolatedLeaderPeerCount;
protected final int minIsolatedLeaderPeerCount;
- private Optional<ByteString> snapshot;
+ private Optional<SnapshotHolder> snapshot;
public AbstractLeader(RaftActorContext context) {
super(context, RaftState.Leader);
public AbstractLeader(RaftActorContext context) {
super(context, RaftState.Leader);
- void setSnapshot(Optional<ByteString> snapshot) {
- this.snapshot = snapshot;
+ void setSnapshot(@Nullable Snapshot snapshot) {
+ if(snapshot != null) {
+ this.snapshot = Optional.of(new SnapshotHolder(snapshot));
+ } else {
+ this.snapshot = Optional.absent();
+ }
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
} else if (message instanceof Replicate) {
sendInstallSnapshot();
} else if (message instanceof Replicate) {
- followerLogInformation.setMatchIndex(
- context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation.setNextIndex(
- context.getReplicatedLog().getSnapshotIndex() + 1);
+ long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+ followerLogInformation.setMatchIndex(followerMatchIndex);
+ followerLogInformation.setNextIndex(followerMatchIndex + 1);
mapFollowerToSnapshot.remove(followerId);
LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
mapFollowerToSnapshot.remove(followerId);
LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
if (mapFollowerToSnapshot.isEmpty()) {
// once there are no pending followers receiving snapshots
// we can remove snapshot from the memory
if (mapFollowerToSnapshot.isEmpty()) {
// once there are no pending followers receiving snapshots
// we can remove snapshot from the memory
- setSnapshot(Optional.<ByteString>absent());
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+ ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
+ snapshot.get().getLastIncludedIndex(),
+ snapshot.get().getLastIncludedTerm(),
nextSnapshotChunk,
followerToSnapshot.incrementChunkIndex(),
followerToSnapshot.getTotalChunks(),
nextSnapshotChunk,
followerToSnapshot.incrementChunkIndex(),
followerToSnapshot.getTotalChunks(),
public int followerLogSize() {
return followerToLog.size();
}
public int followerLogSize() {
return followerToLog.size();
}
+
+ private static class SnapshotHolder {
+ private final long lastIncludedTerm;
+ private final long lastIncludedIndex;
+ private final ByteString snapshotBytes;
+
+ SnapshotHolder(Snapshot snapshot) {
+ this.lastIncludedTerm = snapshot.getLastAppliedTerm();
+ this.lastIncludedIndex = snapshot.getLastAppliedIndex();
+ this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+ }
+
+ long getLastIncludedTerm() {
+ return lastIncludedTerm;
+ }
+
+ long getLastIncludedIndex() {
+ return lastIncludedIndex;
+ }
+
+ ByteString getSnapshotBytes() {
+ return snapshotBytes;
+ }
+ }
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
// Wait for the follower to persist the snapshot.
MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
// Wait for the follower to persist the snapshot.
MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
- // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent
- // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's
- // snapshotIndex and not the actual last index included in the snapshot.
- // FIXME? - is this OK?
- MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
- List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2);
+ List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
- assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
killActor(follower2Actor);
assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
killActor(follower2Actor);
assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
- assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
}
assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
}
assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
- assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
+ assertEquals("InstallSnapshot getLastIncludedIndex", 9, installSnapshot.getLastIncludedIndex());
//assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
//assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
- verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
+ verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 9, currentTerm, 9);
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
- // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
- applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
- verifyApplyState(applyState, null, null, currentTerm, 9, payload9);
-
// Wait for the snapshot to complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
// Wait for the snapshot to complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
- assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+ assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState()));
assertEquals("getTerm", -1L, reader.getTerm());
assertEquals("getIndex", -1L, reader.getIndex());
}
assertEquals("getTerm", -1L, reader.getTerm());
assertEquals("getIndex", -1L, reader.getIndex());
}
-}
\ No newline at end of file
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int newEntryIndex = 4;
final int snapshotTerm = 1;
final int currentTerm = 2;
final int newEntryIndex = 4;
final int snapshotTerm = 1;
final int currentTerm = 2;
// set the snapshot variables in replicatedlog
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
// set the snapshot variables in replicatedlog
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
//set follower timeout to 2 mins, helps during debugging
actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
leader = new Leader(actorContext);
//set follower timeout to 2 mins, helps during debugging
actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
// new entry
ReplicatedLogImplEntry entry =
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
// new entry
ReplicatedLogImplEntry entry =
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
leader.markFollowerActive(FOLLOWER_ID);
ByteString bs = toByteString(leadersSnapshot);
leader.markFollowerActive(FOLLOWER_ID);
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
- assertEquals(snapshotIndex, is.getLastIncludedIndex());
+ assertEquals(commitIndex, is.getLastIncludedIndex());
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// set the snapshot as absent and check if capture-snapshot is invoked.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(Optional.<ByteString>absent());
+ leader.setSnapshot(null);
// new entry
ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
// new entry
ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int lastAppliedIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(lastAppliedIndex);
+ actorContext.setLastApplied(lastAppliedIndex);
leader = new Leader(actorContext);
leader = new Leader(actorContext);
- // Ignore initial heartbeat.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new SendInstallSnapshot(toByteString(leadersSnapshot)));
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
+ Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
assertTrue(raftBehavior instanceof Leader);
assertTrue(raftBehavior instanceof Leader);
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertNotNull(installSnapshot.getData());
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertNotNull(installSnapshot.getData());
- assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+ assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
assertEquals(currentTerm, installSnapshot.getTerm());
assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
assertEquals(currentTerm, installSnapshot.getTerm());
MockRaftActorContext actorContext = createActorContextWithFollower();
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
// Ignore initial heartbeat.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// Ignore initial heartbeat.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
assertEquals(1, leader.followerLogSize());
FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
assertNotNull(fli);
assertEquals(1, leader.followerLogSize());
FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
assertNotNull(fli);
- assertEquals(snapshotIndex, fli.getMatchIndex());
- assertEquals(snapshotIndex, fli.getMatchIndex());
- assertEquals(snapshotIndex + 1, fli.getNextIndex());
+ assertEquals(commitIndex, fli.getMatchIndex());
+ assertEquals(commitIndex + 1, fli.getNextIndex());
MockRaftActorContext actorContext = createActorContextWithFollower();
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
actorContext.setConfigParams(configParams);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
actorContext.setConfigParams(configParams);
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
MockRaftActorContext actorContext = createActorContextWithFollower();
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
MockRaftActorContext actorContext = createActorContextWithFollower();
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);