import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
private Leader leader;
+ private final short payloadVersion = 5;
@Override
@After
assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
+ }
+
+ @Test
+ public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+ logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setRaftPolicy(createRaftPolicy(true, true));
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
+
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
}
@Test
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int newEntryIndex = 4;
final int snapshotTerm = 1;
final int currentTerm = 2;
// set the snapshot variables in replicatedlog
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
//set follower timeout to 2 mins, helps during debugging
actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
// new entry
ReplicatedLogImplEntry entry =
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
leader.markFollowerActive(FOLLOWER_ID);
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
- assertEquals(snapshotIndex, is.getLastIncludedIndex());
+ assertEquals(commitIndex, is.getLastIncludedIndex());
}
@Test
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(Optional.<ByteString>absent());
+ leader.setSnapshot(null);
// new entry
ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int lastAppliedIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(lastAppliedIndex);
+ actorContext.setLastApplied(lastAppliedIndex);
leader = new Leader(actorContext);
- // Ignore initial heartbeat.
+ // Initial heartbeat.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new SendInstallSnapshot(toByteString(leadersSnapshot)));
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
+ Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
assertTrue(raftBehavior instanceof Leader);
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertNotNull(installSnapshot.getData());
- assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+ assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
assertEquals(currentTerm, installSnapshot.getTerm());
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
// Ignore initial heartbeat.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
assertEquals(1, leader.followerLogSize());
FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
assertNotNull(fli);
- assertEquals(snapshotIndex, fli.getMatchIndex());
- assertEquals(snapshotIndex, fli.getMatchIndex());
- assertEquals(snapshotIndex + 1, fli.getNextIndex());
+ assertEquals(commitIndex, fli.getMatchIndex());
+ assertEquals(commitIndex + 1, fli.getNextIndex());
}
@Test
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
actorContext.setConfigParams(configParams);
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
}
});
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
final int snapshotTerm = 1;
final int currentTerm = 2;
}
});
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
configParams.setElectionTimeoutFactor(100000);
MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
context.setConfigParams(configParams);
+ context.setPayloadVersion(payloadVersion);
return context;
}
leader = new Leader(leaderActorContext);
+ assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+
short payloadVersion = 5;
AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(1000, TimeUnit.SECONDS));
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
leaderActorContext.setReplicatedLog(
new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());