import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private final Map<String, ActorSelection> followerToActor = new HashMap<>();
- private Cancellable heartbeatCancel = null;
+ private Cancellable heartbeatSchedule = null;
+ private Cancellable appendEntriesSchedule = null;
+ private Cancellable installSnapshotSchedule = null;
private List<ClientRequestTracker> trackerList = new ArrayList<>();
public Leader(RaftActorContext context) {
super(context);
- if(lastIndex() >= 0) {
+ if (lastIndex() >= 0) {
context.setCommitIndex(lastIndex());
}
context.actorSelection(context.getPeerAddress(followerId)));
followerToLog.put(followerId, followerLogInformation);
-
}
+ context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
+
if (followerToActor.size() > 0) {
minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
} else {
// prevent election timeouts (§5.2)
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ scheduleInstallSnapshotCheck(
+ new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
+ HEART_BEAT_INTERVAL.unit())
+ );
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState) {
-
- context.getLogger()
- .error("An unexpected AppendEntries received in state " + state());
+ AppendEntries appendEntries) {
- return suggestedState;
+ return state();
}
@Override protected RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-
- // Do not take any other action since a behavior change is coming
- if (suggestedState != state())
- return suggestedState;
+ AppendEntriesReply appendEntriesReply) {
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
followerLogInformation
.setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
} else {
+
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about
+
followerLogInformation.decrNextIndex();
}
}
}
- if (replicatedCount >= minReplicationCount){
+ if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry =
context.getReplicatedLog().get(N);
if (replicatedLogEntry != null
}
}
- if(context.getCommitIndex() > context.getLastApplied()){
+ // Apply the change to the state machine
+ if (context.getCommitIndex() > context.getLastApplied()) {
applyLogToStateMachine(context.getCommitIndex());
}
- return suggestedState;
+ return state();
}
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
}
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState) {
- return suggestedState;
+ RequestVoteReply requestVoteReply) {
+ return state();
}
@Override public RaftState state() {
return RaftState.Leader;
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
+ Object message = fromSerializableMessage(originalMessage);
+
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ return RaftState.Follower;
+ }
+ }
+
try {
if (message instanceof SendHeartBeat) {
return sendHeartBeat();
+ } else if(message instanceof SendInstallSnapshot) {
+ installSnapshotIfNeeded();
} else if (message instanceof Replicate) {
-
- Replicate replicate = (Replicate) message;
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- context.getLogger().debug("Replicate message " + logIndex);
-
- if (followerToActor.size() == 0) {
- context.setCommitIndex(
- replicate.getReplicatedLogEntry().getIndex());
-
- context.getActor()
- .tell(new ApplyState(replicate.getClientActor(),
- replicate.getIdentifier(),
- replicate.getReplicatedLogEntry()),
- context.getActor()
- );
- } else {
-
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- ReplicatedLogEntry prevEntry =
- context.getReplicatedLog().get(lastIndex() - 1);
- long prevLogIndex = -1;
- long prevLogTerm = -1;
- if (prevEntry != null) {
- prevLogIndex = prevEntry.getIndex();
- prevLogTerm = prevEntry.getTerm();
- }
- // Send an AppendEntries to all followers
- for (String followerId : followerToActor.keySet()) {
- ActorSelection followerActor =
- followerToActor.get(followerId);
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex, prevLogTerm,
- context.getReplicatedLog().getFrom(
- followerLogInformation.getNextIndex()
- .get()
- ), context.getCommitIndex()
- ),
- actor()
- );
- }
- }
+ replicate((Replicate) message);
+ } else if (message instanceof InstallSnapshotReply){
+ handleInstallSnapshotReply(
+ (InstallSnapshotReply) message);
}
} finally {
scheduleHeartBeat(HEART_BEAT_INTERVAL);
return super.handleMessage(sender, message);
}
- private RaftState sendHeartBeat() {
- if (followerToActor.size() > 0) {
- for (String follower : followerToActor.keySet()) {
+ private void handleInstallSnapshotReply(InstallSnapshotReply message) {
+ InstallSnapshotReply reply = message;
+ String followerId = reply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ followerLogInformation
+ .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
+ followerLogInformation
+ .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+ }
- FollowerLogInformation followerLogInformation =
- followerToLog.get(follower);
+ private void replicate(Replicate replicate) {
+ long logIndex = replicate.getReplicatedLogEntry().getIndex();
- AtomicLong nextIndex =
- followerLogInformation.getNextIndex();
+ context.getLogger().debug("Replicate message " + logIndex);
- List<ReplicatedLogEntry> entries =
- context.getReplicatedLog().getFrom(nextIndex.get());
+ if (followerToActor.size() == 0) {
+ context.setCommitIndex(
+ replicate.getReplicatedLogEntry().getIndex());
- followerToActor.get(follower).tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm(),
- context.getId(),
- context.getReplicatedLog().lastIndex(),
- context.getReplicatedLog().lastTerm(),
- entries, context.getCommitIndex()),
+ context.getActor()
+ .tell(new ApplyState(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ replicate.getReplicatedLogEntry()),
context.getActor()
);
+ } else {
+
+ // Create a tracker entry we will use this later to notify the
+ // client actor
+ trackerList.add(
+ new ClientRequestTrackerImpl(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ logIndex)
+ );
+
+ sendAppendEntries();
+ }
+ }
+
+ private void sendAppendEntries() {
+ // Send an AppendEntries to all followers
+ for (String followerId : followerToActor.keySet()) {
+ ActorSelection followerActor =
+ followerToActor.get(followerId);
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+ if(context.getReplicatedLog().isPresent(nextIndex)){
+ // TODO: Instead of sending all entries from nextIndex
+ // only send a fixed number of entries to each follower
+ // This is to avoid the situation where there are a lot of
+ // entries to install for a fresh follower or to a follower
+ // that has fallen too far behind with the log but yet is not
+ // eligible to receive a snapshot
+ entries =
+ context.getReplicatedLog().getFrom(nextIndex);
+ }
+
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(), prevLogIndex(nextIndex),
+ prevLogTerm(nextIndex), entries, context.getCommitIndex()).toSerializable(),
+ actor());
+ }
+ }
+
+ /**
+ * An installSnapshot is scheduled at a interval that is a multiple of
+ * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+ * snapshots at every heartbeat.
+ */
+ private void installSnapshotIfNeeded(){
+ for (String followerId : followerToActor.keySet()) {
+ ActorSelection followerActor =
+ followerToActor.get(followerId);
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().getSnapshot()
+ ),
+ actor()
+ );
}
}
+ }
+
+ private RaftState sendHeartBeat() {
+ if (followerToActor.size() > 0) {
+ sendAppendEntries();
+ }
return state();
}
private void stopHeartBeat() {
- if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
- heartbeatCancel.cancel();
+ if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+ heartbeatSchedule.cancel();
+ }
+ }
+
+ private void stopInstallSnapshotSchedule() {
+ if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+ installSnapshotSchedule.cancel();
}
}
private void scheduleHeartBeat(FiniteDuration interval) {
+ if(followerToActor.keySet().size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
stopHeartBeat();
// Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
// Scheduling the heartbeat only once here because heartbeats do not
// need to be sent if there are other messages being sent to the remote
// actor.
- heartbeatCancel =
+ heartbeatSchedule =
context.getActorSystem().scheduler().scheduleOnce(
interval,
context.getActor(), new SendHeartBeat(),
context.getActorSystem().dispatcher(), context.getActor());
}
+
+ private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+ if(followerToActor.keySet().size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
+ stopInstallSnapshotSchedule();
+
+ // Schedule a message to send append entries to followers that can
+ // accept an append entries with some data in it
+ installSnapshotSchedule =
+ context.getActorSystem().scheduler().scheduleOnce(
+ interval,
+ context.getActor(), new SendInstallSnapshot(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+
+
@Override public void close() throws Exception {
stopHeartBeat();
}