import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private final Map<String, ActorSelection> followerToActor = new HashMap<>();
- private Cancellable heartbeatCancel = null;
+ private Cancellable heartbeatSchedule = null;
+ private Cancellable appendEntriesSchedule = null;
+ private Cancellable installSnapshotSchedule = null;
private List<ClientRequestTracker> trackerList = new ArrayList<>();
// prevent election timeouts (ยง5.2)
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ scheduleInstallSnapshotCheck(
+ new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
+ HEART_BEAT_INTERVAL.unit())
+ );
}
try {
if (message instanceof SendHeartBeat) {
return sendHeartBeat();
+ } else if(message instanceof SendInstallSnapshot) {
+ installSnapshotIfNeeded();
} else if (message instanceof Replicate) {
replicate((Replicate) message);
+ } else if (message instanceof InstallSnapshotReply){
+ // FIXME : Should I be checking the term here too?
+ handleInstallSnapshotReply(
+ (InstallSnapshotReply) message);
}
} finally {
scheduleHeartBeat(HEART_BEAT_INTERVAL);
return super.handleMessage(sender, message);
}
+ private void handleInstallSnapshotReply(InstallSnapshotReply message) {
+ InstallSnapshotReply reply = message;
+ String followerId = reply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ followerLogInformation
+ .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
+ followerLogInformation
+ .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+ }
+
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
long nextIndex = followerLogInformation.getNextIndex().get();
- List<ReplicatedLogEntry> entries =
- context.getReplicatedLog().getFrom(nextIndex);
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+ if(context.getReplicatedLog().isPresent(nextIndex)){
+ entries =
+ context.getReplicatedLog().getFrom(nextIndex);
+ }
followerActor.tell(
new AppendEntries(currentTerm(), context.getId(),
}
}
+ private void installSnapshotIfNeeded(){
+ for (String followerId : followerToActor.keySet()) {
+ ActorSelection followerActor =
+ followerToActor.get(followerId);
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().getSnapshot()
+ ),
+ actor()
+ );
+ }
+ }
+ }
+
private RaftState sendHeartBeat() {
if (followerToActor.size() > 0) {
sendAppendEntries();
}
private void stopHeartBeat() {
- if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
- heartbeatCancel.cancel();
+ if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+ heartbeatSchedule.cancel();
+ }
+ }
+
+ private void stopInstallSnapshotSchedule() {
+ if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+ installSnapshotSchedule.cancel();
}
}
// Scheduling the heartbeat only once here because heartbeats do not
// need to be sent if there are other messages being sent to the remote
// actor.
- heartbeatCancel =
+ heartbeatSchedule =
context.getActorSystem().scheduler().scheduleOnce(
interval,
context.getActor(), new SendHeartBeat(),
context.getActorSystem().dispatcher(), context.getActor());
}
+
+ private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+ if(followerToActor.keySet().size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
+ stopInstallSnapshotSchedule();
+
+ // Schedule a message to send append entries to followers that can
+ // accept an append entries with some data in it
+ installSnapshotSchedule =
+ context.getActorSystem().scheduler().scheduleOnce(
+ interval,
+ context.getActor(), new SendInstallSnapshot(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+
+
@Override public void close() throws Exception {
stopHeartBeat();
}