Initial implementation of saving and installing snapshots
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index ec4551ac4ffb7cae84c9ce9fd82a652499017d98..90edf7da9a8dc5660f13f1eb783543f1bc8c17c1 100644 (file)
@@ -22,12 +22,16 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +68,9 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private final Map<String, ActorSelection> followerToActor = new HashMap<>();
 
-    private Cancellable heartbeatCancel = null;
+    private Cancellable heartbeatSchedule = null;
+    private Cancellable appendEntriesSchedule = null;
+    private Cancellable installSnapshotSchedule = null;
 
     private List<ClientRequestTracker> trackerList = new ArrayList<>();
 
@@ -103,6 +109,10 @@ public class Leader extends AbstractRaftActorBehavior {
         // prevent election timeouts (ยง5.2)
         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
 
+        scheduleInstallSnapshotCheck(
+            new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
+                HEART_BEAT_INTERVAL.unit())
+        );
 
     }
 
@@ -194,8 +204,14 @@ public class Leader extends AbstractRaftActorBehavior {
         try {
             if (message instanceof SendHeartBeat) {
                 return sendHeartBeat();
+            } else if(message instanceof SendInstallSnapshot) {
+                installSnapshotIfNeeded();
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
+            } else if (message instanceof InstallSnapshotReply){
+                // FIXME : Should I be checking the term here too?
+                handleInstallSnapshotReply(
+                    (InstallSnapshotReply) message);
             }
         } finally {
             scheduleHeartBeat(HEART_BEAT_INTERVAL);
@@ -204,6 +220,18 @@ public class Leader extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshotReply(InstallSnapshotReply message) {
+        InstallSnapshotReply reply = message;
+        String followerId = reply.getFollowerId();
+        FollowerLogInformation followerLogInformation =
+            followerToLog.get(followerId);
+
+        followerLogInformation
+            .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
+        followerLogInformation
+            .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+    }
+
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
@@ -244,8 +272,12 @@ public class Leader extends AbstractRaftActorBehavior {
 
             long nextIndex = followerLogInformation.getNextIndex().get();
 
-            List<ReplicatedLogEntry> entries =
-                context.getReplicatedLog().getFrom(nextIndex);
+            List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+            if(context.getReplicatedLog().isPresent(nextIndex)){
+                entries =
+                    context.getReplicatedLog().getFrom(nextIndex);
+            }
 
             followerActor.tell(
                 new AppendEntries(currentTerm(), context.getId(),
@@ -257,6 +289,29 @@ public class Leader extends AbstractRaftActorBehavior {
         }
     }
 
+    private void installSnapshotIfNeeded(){
+        for (String followerId : followerToActor.keySet()) {
+            ActorSelection followerActor =
+                followerToActor.get(followerId);
+
+            FollowerLogInformation followerLogInformation =
+                followerToLog.get(followerId);
+
+            long nextIndex = followerLogInformation.getNextIndex().get();
+
+            if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm(),
+                        context.getReplicatedLog().getSnapshot()
+                    ),
+                    actor()
+                );
+            }
+        }
+    }
+
     private RaftState sendHeartBeat() {
         if (followerToActor.size() > 0) {
             sendAppendEntries();
@@ -265,8 +320,14 @@ public class Leader extends AbstractRaftActorBehavior {
     }
 
     private void stopHeartBeat() {
-        if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
-            heartbeatCancel.cancel();
+        if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+            heartbeatSchedule.cancel();
+        }
+    }
+
+    private void stopInstallSnapshotSchedule() {
+        if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+            installSnapshotSchedule.cancel();
         }
     }
 
@@ -284,13 +345,34 @@ public class Leader extends AbstractRaftActorBehavior {
         // Scheduling the heartbeat only once here because heartbeats do not
         // need to be sent if there are other messages being sent to the remote
         // actor.
-        heartbeatCancel =
+        heartbeatSchedule =
             context.getActorSystem().scheduler().scheduleOnce(
                 interval,
                 context.getActor(), new SendHeartBeat(),
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
+
+    private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+        if(followerToActor.keySet().size() == 0){
+            // Optimization - do not bother scheduling a heartbeat as there are
+            // no followers
+            return;
+        }
+
+        stopInstallSnapshotSchedule();
+
+        // Schedule a message to send append entries to followers that can
+        // accept an append entries with some data in it
+        installSnapshotSchedule =
+            context.getActorSystem().scheduler().scheduleOnce(
+                interval,
+                context.getActor(), new SendInstallSnapshot(),
+                context.getActorSystem().dispatcher(), context.getActor());
+    }
+
+
+
     @Override public void close() throws Exception {
         stopHeartBeat();
     }