This commit is based on a code walk through done with Colin Dixon and
Abhishek Kumar.
TODOs and FIXMEs have been added to guide further development.
The major things that need to be completed are,
1. Installing snapshots
2. Adding and removing a new peer with consensus
3. Optimizing AppendEntries (faster synchronization)
Change-Id: Ic788e050fe8fa591176a927906004fd2277e29fa
Signed-off-by: Moiz Raja <moraja@cisco.com>
String getVotedFor();
/**
String getVotedFor();
/**
- * Called when we need to update the current term either because we received
- * a message from someone with a more uptodate term or because we just voted
- * for someone
+ * To be called mainly when we are recovering in-memory election state from
+ * persistent storage
*
* @param currentTerm
* @param votedFor
*
* @param currentTerm
* @param votedFor
void update(long currentTerm, String votedFor);
/**
void update(long currentTerm, String votedFor);
/**
+ * To be called when we need to update the current term either because we
+ * received a message from someone with a more up-to-date term or because we
+ * just voted for someone
+ * <p>
+ * This information needs to be persisted so that on recovery the replica
+ * can start itself in the right term and know if it has already voted in
+ * that term or not
*
* @param currentTerm
* @param votedFor
*
* @param currentTerm
* @param votedFor
public RaftActor(String id, Map<String, String> peerAddresses) {
public RaftActor(String id, Map<String, String> peerAddresses) {
- final String id1 = getSelf().path().toString();
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(),
id, new ElectionTermImpl(),
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(),
id, new ElectionTermImpl(),
trimPersistentData(success.metadata().sequenceNr());
} else if (message instanceof SaveSnapshotFailure) {
trimPersistentData(success.metadata().sequenceNr());
} else if (message instanceof SaveSnapshotFailure) {
// TODO: Handle failure in saving the snapshot
// TODO: Handle failure in saving the snapshot
- } else if (message instanceof FindLeader){
- getSender().tell(new FindLeaderReply(
- context.getPeerAddress(currentBehavior.getLeaderId())),
- getSelf());
+ // Maybe do retries on failure
} else if (message instanceof AddRaftPeer){
} else if (message instanceof AddRaftPeer){
+
+ // FIXME : Do not add raft peers like this.
+ // When adding a new Peer we have to ensure that the a majority of
+ // the peers know about the new Peer. Doing it this way may cause
+ // a situation where multiple Leaders may emerge
AddRaftPeer arp = (AddRaftPeer)message;
context.addToPeers(arp.getName(), arp.getAddress());
} else if (message instanceof RemoveRaftPeer){
AddRaftPeer arp = (AddRaftPeer)message;
context.addToPeers(arp.getName(), arp.getAddress());
} else if (message instanceof RemoveRaftPeer){
RemoveRaftPeer rrp = (RemoveRaftPeer)message;
context.removePeer(rrp.getName());
RemoveRaftPeer rrp = (RemoveRaftPeer)message;
context.removePeer(rrp.getName());
RaftState state =
currentBehavior.handleMessage(getSender(), message);
currentBehavior = switchBehavior(state);
RaftState state =
currentBehavior.handleMessage(getSender(), message);
currentBehavior = switchBehavior(state);
- * Remove all the entries from the logs >= index
+ * To be called when we need to remove entries from the in-memory log.
+ * This method will remove all entries >= index. This method should be used
+ * during recovery to appropriately trim the log based on persisted
+ * information
*
* @param index the index of the log entry
*/
*
* @param index the index of the log entry
*/
- * Remove all entries starting from the specified entry and persist the
- * information to disk
+ * To be called when we need to remove entries from the in-memory log and we
+ * need that information persisted to disk. This method will remove all
+ * entries >= index.
+ * <p>
+ * The persisted information would then be used during recovery to properly
+ * reconstruct the state of the in-memory replicated log
+ * @param index the index of the log entry
*/
void removeFromAndPersist(long index);
*/
void removeFromAndPersist(long index);
context.getLogger().debug("Starting new term " + (currentTerm+1));
// Request for a vote
context.getLogger().debug("Starting new term " + (currentTerm+1));
// Request for a vote
+ // TODO: Retry request for vote if replies do not arrive in a reasonable
+ // amount of time TBD
for (ActorSelection peerActor : peerToActor.values()) {
peerActor.tell(new RequestVote(
context.getTermInformation().getCurrentTerm(),
for (ActorSelection peerActor : peerToActor.values()) {
peerActor.tell(new RequestVote(
context.getTermInformation().getCurrentTerm(),
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
+ // TODO : Refactor this method into a bunch of smaller methods
+ // to make it easier to read. Before refactoring ensure tests
+ // cover the code properly
+
+ // 1. Reply false if term < currentTerm (§5.1)
+ // This is handled in the appendEntries method of the base class
+
// If we got here then we do appear to be talking to the leader
leaderId = appendEntries.getLeaderId();
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
// If we got here then we do appear to be talking to the leader
leaderId = appendEntries.getLeaderId();
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
ReplicatedLogEntry previousEntry = context.getReplicatedLog()
.get(appendEntries.getPrevLogIndex());
ReplicatedLogEntry previousEntry = context.getReplicatedLog()
.get(appendEntries.getPrevLogIndex());
- boolean noMatchingTerms = true;
+ boolean outOfSync = true;
+ // First check if the logs are in sync or not
if (lastIndex() == -1
&& appendEntries.getPrevLogIndex() != -1) {
if (lastIndex() == -1
&& appendEntries.getPrevLogIndex() != -1) {
+ // The follower's log is out of sync because the leader does have
+ // an entry at prevLogIndex and this follower has no entries in
+ // it's log.
+
context.getLogger().debug(
"The followers log is empty and the senders prevLogIndex is {}",
appendEntries.getPrevLogIndex());
context.getLogger().debug(
"The followers log is empty and the senders prevLogIndex is {}",
appendEntries.getPrevLogIndex());
&& appendEntries.getPrevLogIndex() != -1
&& previousEntry == null) {
&& appendEntries.getPrevLogIndex() != -1
&& previousEntry == null) {
+ // The follower's log is out of sync because the Leader's
+ // prevLogIndex entry was not found in it's log
+
context.getLogger().debug(
"The log is not empty but the prevLogIndex {} was not found in it",
appendEntries.getPrevLogIndex());
context.getLogger().debug(
"The log is not empty but the prevLogIndex {} was not found in it",
appendEntries.getPrevLogIndex());
&& previousEntry != null
&& previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
&& previousEntry != null
&& previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+ // The follower's log is out of sync because the Leader's
+ // prevLogIndex entry does exist in the follower's log but it has
+ // a different term in it
+
context.getLogger().debug(
"Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
, previousEntry.getTerm()
, appendEntries.getPrevLogTerm());
} else {
context.getLogger().debug(
"Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
, previousEntry.getTerm()
, appendEntries.getPrevLogTerm());
} else {
- noMatchingTerms = false;
+ if (outOfSync) {
+ // We found that the log was out of sync so just send a negative
+ // reply and return
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
// follow it (§5.3)
int addEntriesFrom = 0;
if (context.getReplicatedLog().size() > 0) {
// follow it (§5.3)
int addEntriesFrom = 0;
if (context.getReplicatedLog().size() > 0) {
+
+ // Find the entry up until which the one that is not in the
+ // follower's log
for (int i = 0;
i < appendEntries.getEntries()
.size(); i++, addEntriesFrom++) {
for (int i = 0;
i < appendEntries.getEntries()
.size(); i++, addEntriesFrom++) {
- if (newEntry != null && newEntry.getTerm() == matchEntry
+ if (newEntry.getTerm() == matchEntry
.getTerm()) {
continue;
}
.getTerm()) {
continue;
}
- if (newEntry != null && newEntry.getTerm() != matchEntry
- .getTerm()) {
- context.getLogger().debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
- );
- context.getReplicatedLog()
- .removeFromAndPersist(matchEntry.getIndex());
- break;
- }
+
+ context.getLogger().debug(
+ "Removing entries from log starting at "
+ + matchEntry.getIndex()
+ );
+
+ // Entries do not match so remove all subsequent entries
+ context.getReplicatedLog()
+ .removeFromAndPersist(matchEntry.getIndex());
+ break;
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
context.getLogger().debug(
"Append entry to log " + appendEntries.getEntries().get(i).getData()
.toString()
context.getLogger().debug(
"Append entry to log " + appendEntries.getEntries().get(i).getData()
.toString()
followerLogInformation
.setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
} else {
followerLogInformation
.setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
} else {
+
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about
+
followerLogInformation.decrNextIndex();
}
followerLogInformation.decrNextIndex();
}
} else if (message instanceof Replicate) {
replicate((Replicate) message);
} else if (message instanceof InstallSnapshotReply){
} else if (message instanceof Replicate) {
replicate((Replicate) message);
} else if (message instanceof InstallSnapshotReply){
- // FIXME : Should I be checking the term here too?
handleInstallSnapshotReply(
(InstallSnapshotReply) message);
}
handleInstallSnapshotReply(
(InstallSnapshotReply) message);
}
List<ReplicatedLogEntry> entries = Collections.emptyList();
if(context.getReplicatedLog().isPresent(nextIndex)){
List<ReplicatedLogEntry> entries = Collections.emptyList();
if(context.getReplicatedLog().isPresent(nextIndex)){
+ // TODO: Instead of sending all entries from nextIndex
+ // only send a fixed number of entries to each follower
+ // This is to avoid the situation where there are a lot of
+ // entries to install for a fresh follower or to a follower
+ // that has fallen too far behind with the log but yet is not
+ // eligible to receive a snapshot
entries =
context.getReplicatedLog().getFrom(nextIndex);
}
entries =
context.getReplicatedLog().getFrom(nextIndex);
}
+ /**
+ * An installSnapshot is scheduled at a interval that is a multiple of
+ * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+ * snapshots at every heartbeat.
+ */
private void installSnapshotIfNeeded(){
for (String followerId : followerToActor.keySet()) {
ActorSelection followerActor =
private void installSnapshotIfNeeded(){
for (String followerId : followerToActor.keySet()) {
ActorSelection followerActor =
assertEquals(4, log.last().getIndex() + 1);
assertNotNull(log.get(2));
assertEquals(4, log.last().getIndex() + 1);
assertNotNull(log.get(2));
- // Check that the entry at index 2 has the new data
assertEquals("one", log.get(1).getData());
assertEquals("one", log.get(1).getData());
+
+ // Check that the entry at index 2 has the new data
assertEquals("two-1", log.get(2).getData());
assertEquals("two-1", log.get(2).getData());
assertEquals("three", log.get(3).getData());
assertNotNull(log.get(3));
assertEquals("three", log.get(3).getData());
assertNotNull(log.get(3));