String getVotedFor();
/**
- * Called when we need to update the current term either because we received
- * a message from someone with a more uptodate term or because we just voted
- * for someone
+ * To be called mainly when we are recovering in-memory election state from
+ * persistent storage
*
* @param currentTerm
* @param votedFor
void update(long currentTerm, String votedFor);
/**
+ * To be called when we need to update the current term either because we
+ * received a message from someone with a more up-to-date term or because we
+ * just voted for someone
+ * <p>
+ * This information needs to be persisted so that on recovery the replica
+ * can start itself in the right term and know if it has already voted in
+ * that term or not
*
* @param currentTerm
* @param votedFor
public RaftActor(String id, Map<String, String> peerAddresses) {
- final String id1 = getSelf().path().toString();
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(),
id, new ElectionTermImpl(),
trimPersistentData(success.metadata().sequenceNr());
} else if (message instanceof SaveSnapshotFailure) {
+
// TODO: Handle failure in saving the snapshot
- } else if (message instanceof FindLeader){
- getSender().tell(new FindLeaderReply(
- context.getPeerAddress(currentBehavior.getLeaderId())),
- getSelf());
+ // Maybe do retries on failure
} else if (message instanceof AddRaftPeer){
+
+ // FIXME : Do not add raft peers like this.
+ // When adding a new Peer we have to ensure that the a majority of
+ // the peers know about the new Peer. Doing it this way may cause
+ // a situation where multiple Leaders may emerge
AddRaftPeer arp = (AddRaftPeer)message;
context.addToPeers(arp.getName(), arp.getAddress());
} else if (message instanceof RemoveRaftPeer){
+
RemoveRaftPeer rrp = (RemoveRaftPeer)message;
context.removePeer(rrp.getName());
+
} else {
+
RaftState state =
currentBehavior.handleMessage(getSender(), message);
currentBehavior = switchBehavior(state);
long lastTerm();
/**
- * Remove all the entries from the logs >= index
+ * To be called when we need to remove entries from the in-memory log.
+ * This method will remove all entries >= index. This method should be used
+ * during recovery to appropriately trim the log based on persisted
+ * information
*
* @param index the index of the log entry
*/
/**
- * Remove all entries starting from the specified entry and persist the
- * information to disk
+ * To be called when we need to remove entries from the in-memory log and we
+ * need that information persisted to disk. This method will remove all
+ * entries >= index.
+ * <p>
+ * The persisted information would then be used during recovery to properly
+ * reconstruct the state of the in-memory replicated log
*
- * @param index
+ * @param index the index of the log entry
*/
void removeFromAndPersist(long index);
context.getLogger().debug("Starting new term " + (currentTerm+1));
// Request for a vote
+ // TODO: Retry request for vote if replies do not arrive in a reasonable
+ // amount of time TBD
for (ActorSelection peerActor : peerToActor.values()) {
peerActor.tell(new RequestVote(
context.getTermInformation().getCurrentTerm(),
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
+ // TODO : Refactor this method into a bunch of smaller methods
+ // to make it easier to read. Before refactoring ensure tests
+ // cover the code properly
+
+ // 1. Reply false if term < currentTerm (§5.1)
+ // This is handled in the appendEntries method of the base class
+
// If we got here then we do appear to be talking to the leader
leaderId = appendEntries.getLeaderId();
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
+
ReplicatedLogEntry previousEntry = context.getReplicatedLog()
.get(appendEntries.getPrevLogIndex());
- boolean noMatchingTerms = true;
+ boolean outOfSync = true;
+ // First check if the logs are in sync or not
if (lastIndex() == -1
&& appendEntries.getPrevLogIndex() != -1) {
+ // The follower's log is out of sync because the leader does have
+ // an entry at prevLogIndex and this follower has no entries in
+ // it's log.
+
context.getLogger().debug(
"The followers log is empty and the senders prevLogIndex is {}",
appendEntries.getPrevLogIndex());
&& appendEntries.getPrevLogIndex() != -1
&& previousEntry == null) {
+ // The follower's log is out of sync because the Leader's
+ // prevLogIndex entry was not found in it's log
+
context.getLogger().debug(
"The log is not empty but the prevLogIndex {} was not found in it",
appendEntries.getPrevLogIndex());
&& previousEntry != null
&& previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+ // The follower's log is out of sync because the Leader's
+ // prevLogIndex entry does exist in the follower's log but it has
+ // a different term in it
+
context.getLogger().debug(
"Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
, previousEntry.getTerm()
, appendEntries.getPrevLogTerm());
} else {
- noMatchingTerms = false;
+ outOfSync = false;
}
- if (noMatchingTerms) {
+ if (outOfSync) {
+ // We found that the log was out of sync so just send a negative
+ // reply and return
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
// follow it (§5.3)
int addEntriesFrom = 0;
if (context.getReplicatedLog().size() > 0) {
+
+ // Find the entry up until which the one that is not in the
+ // follower's log
for (int i = 0;
i < appendEntries.getEntries()
.size(); i++, addEntriesFrom++) {
break;
}
- if (newEntry != null && newEntry.getTerm() == matchEntry
+ if (newEntry.getTerm() == matchEntry
.getTerm()) {
continue;
}
- if (newEntry != null && newEntry.getTerm() != matchEntry
- .getTerm()) {
- context.getLogger().debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
- );
- context.getReplicatedLog()
- .removeFromAndPersist(matchEntry.getIndex());
- break;
- }
+
+ context.getLogger().debug(
+ "Removing entries from log starting at "
+ + matchEntry.getIndex()
+ );
+
+ // Entries do not match so remove all subsequent entries
+ context.getReplicatedLog()
+ .removeFromAndPersist(matchEntry.getIndex());
+ break;
}
}
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
+
context.getLogger().debug(
"Append entry to log " + appendEntries.getEntries().get(i).getData()
.toString()
followerLogInformation
.setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
} else {
+
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about
+
followerLogInformation.decrNextIndex();
}
} else if (message instanceof Replicate) {
replicate((Replicate) message);
} else if (message instanceof InstallSnapshotReply){
- // FIXME : Should I be checking the term here too?
handleInstallSnapshotReply(
(InstallSnapshotReply) message);
}
List<ReplicatedLogEntry> entries = Collections.emptyList();
if(context.getReplicatedLog().isPresent(nextIndex)){
+ // TODO: Instead of sending all entries from nextIndex
+ // only send a fixed number of entries to each follower
+ // This is to avoid the situation where there are a lot of
+ // entries to install for a fresh follower or to a follower
+ // that has fallen too far behind with the log but yet is not
+ // eligible to receive a snapshot
entries =
context.getReplicatedLog().getFrom(nextIndex);
}
}
}
+ /**
+ * An installSnapshot is scheduled at a interval that is a multiple of
+ * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+ * snapshots at every heartbeat.
+ */
private void installSnapshotIfNeeded(){
for (String followerId : followerToActor.keySet()) {
ActorSelection followerActor =
assertEquals(4, log.last().getIndex() + 1);
assertNotNull(log.get(2));
- // Check that the entry at index 2 has the new data
+
assertEquals("one", log.get(1).getData());
+
+ // Check that the entry at index 2 has the new data
assertEquals("two-1", log.get(2).getData());
+
assertEquals("three", log.get(3).getData());
assertNotNull(log.get(3));