Initial implementation of saving and installing snapshots 87/9087/6
authorMoiz Raja <moraja@cisco.com>
Wed, 16 Jul 2014 16:17:32 +0000 (09:17 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:37:04 +0000 (15:37 -0700)
This commit basically sketches out how creating and installing snapshots would fit into
the overall design. This has not been tested yet.

Change-Id: I0c575049e8dbda6b63e49dab80ff0980546029d3
Signed-off-by: Moiz Raja <moraja@cisco.com>
13 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 35a2c98..90cc247 100644 (file)
@@ -65,6 +65,15 @@ public class ExampleActor extends RaftActor {
         }
     }
 
+    @Override protected Object createSnapshot() {
+        return state;
+    }
+
+    @Override protected void applySnapshot(Object snapshot) {
+        state.clear();
+        state.putAll((HashMap) snapshot);
+    }
+
     @Override public void onReceiveRecover(Object message) {
         super.onReceiveRecover(message);
     }
index 27d083f..a148ed4 100644 (file)
@@ -10,11 +10,14 @@ package org.opendaylight.controller.cluster.example;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class Main {
@@ -41,14 +44,43 @@ public class Main {
             actorSystem.actorOf(ExampleActor.props("example-3",
                 withoutPeer("example-3")), "example-3");
 
+
+        List<ActorRef> examples = Arrays.asList(example1Actor, example2Actor, example3Actor);
+
         ActorRef clientActor = actorSystem.actorOf(ClientActor.props(example1Actor));
         BufferedReader br =
             new BufferedReader(new InputStreamReader(System.in));
 
+        System.out.println("Usage :");
+        System.out.println("s <1-3> to start a peer");
+        System.out.println("k <1-3> to kill a peer");
+
         while(true) {
-            System.out.print("Enter Integer (0 to exit):");
+            System.out.print("Enter command (0 to exit):");
             try {
-                int i = Integer.parseInt(br.readLine());
+                String s = br.readLine();
+                String[] split = s.split(" ");
+                if(split.length > 1) {
+                    String command = split[0];
+                    String actor = split[1];
+
+                    if ("k".equals(command)) {
+                        int i = Integer.parseInt(actor);
+                        examples.get(i - 1)
+                            .tell(PoisonPill.getInstance(), null);
+                        continue;
+                    } else if ("s".equals(command)) {
+                        int i = Integer.parseInt(actor);
+                        String actorName = "example-" + i;
+                        examples.add(i - 1,
+                            actorSystem.actorOf(ExampleActor.props(actorName,
+                                withoutPeer(actorName)), actorName));
+                        System.out.println("Created actor : " + actorName);
+                        continue;
+                    }
+                }
+
+                int i = Integer.parseInt(s);
                 if(i == 0){
                     System.exit(0);
                 }
index 826faf7..f38ef18 100644 (file)
@@ -14,6 +14,10 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -21,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
 
@@ -33,50 +38,48 @@ import java.util.Map;
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
- *     In Search of an Understandable Consensus Algorithm</a>
- * <p>
+ * In Search of an Understandable Consensus Algorithm</a>
+ * <p/>
  * RaftActor has 3 states and each state has a certain behavior associated
  * with it. A Raft actor can behave as,
  * <ul>
- *     <li> A Leader </li>
- *     <li> A Follower (or) </li>
- *     <li> A Candidate </li>
+ * <li> A Leader </li>
+ * <li> A Follower (or) </li>
+ * <li> A Candidate </li>
  * </ul>
- *
- * <p>
+ * <p/>
+ * <p/>
  * A RaftActor MUST be a Leader in order to accept requests from clients to
  * change the state of it's encapsulated state machine. Once a RaftActor becomes
  * a Leader it is also responsible for ensuring that all followers ultimately
  * have the same log and therefore the same state machine as itself.
- *
- * <p>
+ * <p/>
+ * <p/>
  * The current behavior of a RaftActor determines how election for leadership
  * is initiated and how peer RaftActors react to request for votes.
- *
- * <p>
+ * <p/>
+ * <p/>
  * Each RaftActor also needs to know the current election term. It uses this
  * information for a couple of things. One is to simply figure out who it
  * voted for in the last election. Another is to figure out if the message
  * it received to update it's state is stale.
- *
- * <p>
+ * <p/>
+ * <p/>
  * The RaftActor uses akka-persistence to store it's replicated log.
  * Furthermore through it's behaviors a Raft Actor determines
- *
+ * <p/>
  * <ul>
  * <li> when a log entry should be persisted </li>
  * <li> when a log entry should be applied to the state machine (and) </li>
  * <li> when a snapshot should be saved </li>
  * </ul>
- *
- * <a href="http://doc.akka.io/api/akka/2.3.3/index.html#akka.persistence.UntypedEventsourcedProcessor">UntypeEventSourceProcessor</a>
  */
 public abstract class RaftActor extends UntypedPersistentActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
     /**
-     *  The current state determines the current behavior of a RaftActor
+     * The current state determines the current behavior of a RaftActor
      * A Raft Actor always starts off in the Follower State
      */
     private RaftActorBehavior currentBehavior;
@@ -93,36 +96,61 @@ public abstract class RaftActor extends UntypedPersistentActor {
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
 
-
-    public RaftActor(String id, Map<String, String> peerAddresses){
+    public RaftActor(String id, Map<String, String> peerAddresses) {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
             id, new ElectionTermImpl(getSelf().path().toString()),
             -1, -1, replicatedLog, peerAddresses, LOG);
-        currentBehavior = switchBehavior(RaftState.Follower);
     }
 
     @Override public void onReceiveRecover(Object message) {
-        if(message instanceof ReplicatedLogEntry) {
+        if (message instanceof SnapshotOffer) {
+            SnapshotOffer offer = (SnapshotOffer) message;
+            Snapshot snapshot = (Snapshot) offer.snapshot();
+
+            // Create a replicated log with the snapshot information
+            // The replicated log can be used later on to retrieve this snapshot
+            // when we need to install it on a peer
+            replicatedLog = new ReplicatedLogImpl(snapshot);
+
+            // Apply the snapshot to the actors state
+            applySnapshot(snapshot.getState());
+
+        } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
-        } else if(message instanceof RecoveryCompleted){
-            LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex());
+        } else if (message instanceof RecoveryCompleted) {
+            LOG.debug(
+                "Last index in log : " + replicatedLog.lastIndex());
+            currentBehavior = switchBehavior(RaftState.Follower);
         }
     }
 
     @Override public void onReceiveCommand(Object message) {
-        if(message instanceof ApplyState){
+        if (message instanceof ApplyState) {
 
-            ApplyState applyState = (ApplyState)  message;
+            ApplyState applyState = (ApplyState) message;
 
-            LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex());
+            LOG.debug("Applying state for log index {}",
+                applyState.getReplicatedLogEntry().getIndex());
 
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
-        } else if(message instanceof FindLeader){
-            getSender().tell(new FindLeaderReply(
-                context.getPeerAddress(currentBehavior.getLeaderId())),
-                getSelf());
+        } else if(message instanceof ApplySnapshot ) {
+            applySnapshot(((ApplySnapshot) message).getSnapshot());
+        } else if (message instanceof FindLeader) {
+            getSender().tell(
+                new FindLeaderReply(
+                    context.getPeerAddress(currentBehavior.getLeaderId())),
+                getSelf()
+            );
+        } else if (message instanceof SaveSnapshotSuccess) {
+            SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+
+            // TODO: Not sure if we want to be this aggressive with trimming stuff
+            trimPersistentData(success.metadata().sequenceNr());
+
+        } else if (message instanceof SaveSnapshotFailure) {
+            // TODO: Handle failure in saving the snapshot
         } else {
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
@@ -130,33 +158,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
-    private RaftActorBehavior switchBehavior(RaftState state){
-        if(currentBehavior != null) {
-            if (currentBehavior.state() == state) {
-                return currentBehavior;
-            }
-            LOG.info("Switching from state " + currentBehavior.state() + " to "
-                + state);
-
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.error(e, "Failed to close behavior : " + currentBehavior.state());
-            }
 
-        } else {
-            LOG.info("Switching behavior to " + state);
-        }
-        RaftActorBehavior behavior = null;
-        if(state == RaftState.Candidate){
-            behavior = new Candidate(context);
-        } else if(state == RaftState.Follower){
-            behavior = new Follower(context);
-        } else {
-            behavior = new Leader(context);
-        }
-        return behavior;
-    }
 
     /**
      * When a derived RaftActor needs to persist something it must call
@@ -166,55 +168,170 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * @param identifier
      * @param data
      */
-    protected void persistData(ActorRef clientActor, String identifier, Object data){
+    protected void persistData(ActorRef clientActor, String identifier,
+        Object data) {
         LOG.debug("Persist data " + identifier);
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
 
-        replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry);
+        replicatedLog
+            .appendAndPersist(clientActor, identifier, replicatedLogEntry);
     }
 
-    protected abstract void applyState(ActorRef clientActor, String identifier, Object data);
-
-    protected String getId(){
+    protected String getId() {
         return context.getId();
     }
 
-    protected boolean isLeader(){
+    /**
+     * Derived actors can call the isLeader method to check if the current
+     * RaftActor is the Leader or not
+     *
+     * @return true it this RaftActor is a Leader false otherwise
+     */
+    protected boolean isLeader() {
         return context.getId().equals(currentBehavior.getLeaderId());
     }
 
-    protected ActorSelection getLeader(){
+    /**
+     * Derived actor can call getLeader if they need a reference to the Leader.
+     * This would be useful for example in forwarding a request to an actor
+     * which is the leader
+     *
+     * @return A reference to the leader if known, null otherwise
+     */
+    protected ActorSelection getLeader() {
         String leaderId = currentBehavior.getLeaderId();
+        if (leaderId == null) {
+            return null;
+        }
         String peerAddress = context.getPeerAddress(leaderId);
-        LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress);
+        LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
+            + peerAddress);
         return context.actorSelection(peerAddress);
     }
 
+    /**
+     * The applyState method will be called by the RaftActor when some data
+     * needs to be applied to the actor's state
+     *
+     * @param clientActor A reference to the client who sent this message. This
+     *                    is the same reference that was passed to persistData
+     *                    by the derived actor. clientActor may be null when
+     *                    the RaftActor is behaving as a follower or during
+     *                    recovery.
+     * @param identifier  The identifier of the persisted data. This is also
+     *                    the same identifier that was passed to persistData by
+     *                    the derived actor. identifier may be null when
+     *                    the RaftActor is behaving as a follower or during
+     *                    recovery
+     * @param data        A piece of data that was persisted by the persistData call.
+     *                    This should NEVER be null.
+     */
+    protected abstract void applyState(ActorRef clientActor, String identifier,
+        Object data);
+
+    /**
+     * This method will be called by the RaftActor when a snapshot needs to be
+     * created. The derived actor should respond with its current state.
+     * <p/>
+     * During recovery the state that is returned by the derived actor will
+     * be passed back to it by calling the applySnapshot  method
+     *
+     * @return The current state of the actor
+     */
+    protected abstract Object createSnapshot();
+
+    /**
+     * This method will be called by the RaftActor during recovery to
+     * reconstruct the state of the actor.
+     * <p/>
+     * This method may also be called at any other point during normal
+     * operations when the derived actor is out of sync with it's peers
+     * and the only way to bring it in sync is by applying a snapshot
+     *
+     * @param snapshot A snapshot of the state of the actor
+     */
+    protected abstract void applySnapshot(Object snapshot);
+
+    private RaftActorBehavior switchBehavior(RaftState state) {
+        if (currentBehavior != null) {
+            if (currentBehavior.state() == state) {
+                return currentBehavior;
+            }
+            LOG.info("Switching from state " + currentBehavior.state() + " to "
+                + state);
+
+            try {
+                currentBehavior.close();
+            } catch (Exception e) {
+                LOG.error(e,
+                    "Failed to close behavior : " + currentBehavior.state());
+            }
+
+        } else {
+            LOG.info("Switching behavior to " + state);
+        }
+        RaftActorBehavior behavior = null;
+        if (state == RaftState.Candidate) {
+            behavior = new Candidate(context);
+        } else if (state == RaftState.Follower) {
+            behavior = new Follower(context);
+        } else {
+            behavior = new Leader(context);
+        }
+        return behavior;
+    }
+
+    private void trimPersistentData(long sequenceNumber) {
+        // Trim snapshots
+        // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
+        // For now guessing that it is ANDed.
+        deleteSnapshots(new SnapshotSelectionCriteria(
+            sequenceNumber - 100000, 43200000));
+
+        // Trim journal
+        deleteMessages(sequenceNumber);
+    }
+
     private class ReplicatedLogImpl implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> journal = new ArrayList();
-        private long snapshotIndex = 0;
-        private Object snapShot = null;
+        private final List<ReplicatedLogEntry> journal;
+        private final Object snapshot;
+        private long snapshotIndex = -1;
+        private long snapshotTerm = -1;
+
+        public ReplicatedLogImpl(Snapshot snapshot) {
+            this.snapshot = snapshot.getState();
+            this.snapshotIndex = snapshot.getLastAppliedIndex();
+            this.snapshotTerm = snapshot.getLastAppliedTerm();
+
+            this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
+        }
 
+        public ReplicatedLogImpl() {
+            this.snapshot = null;
+            this.journal = new ArrayList<>();
+        }
 
         @Override public ReplicatedLogEntry get(long index) {
-            if(index < 0 || index >= journal.size()){
+            int adjustedIndex = adjustedIndex(index);
+
+            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return null;
             }
 
-            return journal.get((int) (index - snapshotIndex));
+            return journal.get(adjustedIndex);
         }
 
         @Override public ReplicatedLogEntry last() {
-            if(journal.size() == 0){
+            if (journal.size() == 0) {
                 return null;
             }
             return get(journal.size() - 1);
         }
 
         @Override public long lastIndex() {
-            if(journal.size() == 0){
+            if (journal.size() == 0) {
                 return -1;
             }
 
@@ -222,7 +339,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
 
         @Override public long lastTerm() {
-            if(journal.size() == 0){
+            if (journal.size() == 0) {
                 return -1;
             }
 
@@ -231,56 +348,124 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
         @Override public void removeFrom(long index) {
-            if(index < 0 || index >= journal.size()){
+            int adjustedIndex = adjustedIndex(index);
+
+            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return;
             }
-            for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){
+            for (int i = adjustedIndex;
+                 i < journal.size(); i++) {
                 deleteMessage(i);
                 journal.remove(i);
             }
         }
 
-        @Override public void append(final ReplicatedLogEntry replicatedLogEntry) {
+        @Override public void append(
+            final ReplicatedLogEntry replicatedLogEntry) {
             journal.add(replicatedLogEntry);
         }
 
         @Override public List<ReplicatedLogEntry> getFrom(long index) {
+            int adjustedIndex = adjustedIndex(index);
+
             List<ReplicatedLogEntry> entries = new ArrayList<>(100);
-            if(index < 0 || index >= journal.size()){
+            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return entries;
             }
-            for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){
+            for (int i = adjustedIndex;
+                 i < journal.size(); i++) {
                 entries.add(journal.get(i));
             }
             return entries;
         }
 
-        @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){
+        @Override public void appendAndPersist(
+            final ReplicatedLogEntry replicatedLogEntry) {
             appendAndPersist(null, null, replicatedLogEntry);
         }
 
-        public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){
-            context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex());
+        public void appendAndPersist(final ActorRef clientActor,
+            final String identifier,
+            final ReplicatedLogEntry replicatedLogEntry) {
+            context.getLogger().debug(
+                "Append log entry and persist " + replicatedLogEntry.getIndex());
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);
+
+            // When persisting events with persist it is guaranteed that the
+            // persistent actor will not receive further commands between the
+            // persist call and the execution(s) of the associated event
+            // handler. This also holds for multiple persist calls in context
+            // of a single command.
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
+                        // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
+                        if (size() > 100000) {
+                            ReplicatedLogEntry lastAppliedEntry =
+                                get(context.getLastApplied());
+                            long lastAppliedIndex = -1;
+                            long lastAppliedTerm = -1;
+                            if (lastAppliedEntry != null) {
+                                lastAppliedIndex = lastAppliedEntry.getIndex();
+                                lastAppliedTerm = lastAppliedEntry.getTerm();
+                            }
+
+                            saveSnapshot(Snapshot.create(createSnapshot(),
+                                getFrom(context.getLastApplied() + 1),
+                                lastIndex(), lastTerm(), lastAppliedIndex,
+                                lastAppliedTerm));
+                        }
                         // Send message for replication
-                        if(clientActor != null) {
+                        if (clientActor != null) {
                             currentBehavior.handleMessage(getSelf(),
                                 new Replicate(clientActor, identifier,
-                                    replicatedLogEntry));
+                                    replicatedLogEntry)
+                            );
                         }
                     }
-                });
+                }
+            );
         }
 
         @Override public long size() {
             return journal.size() + snapshotIndex;
         }
+
+        @Override public boolean isPresent(long index) {
+            int adjustedIndex = adjustedIndex(index);
+
+            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override public boolean isInSnapshot(long index) {
+            return index <= snapshotIndex;
+        }
+
+        @Override public Object getSnapshot() {
+            return snapshot;
+        }
+
+        @Override public long getSnapshotIndex() {
+            return snapshotIndex;
+        }
+
+        @Override public long getSnapshotTerm() {
+            return snapshotTerm;
+        }
+
+        private int adjustedIndex(long index) {
+            if(snapshotIndex < 0){
+                return (int) index;
+            }
+            return (int) (index - snapshotIndex);
+        }
     }
 
+
     private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
         Serializable {
 
@@ -288,7 +473,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         private final long term;
         private final Object payload;
 
-        public ReplicatedLogImplEntry(long index, long term, Object payload){
+        public ReplicatedLogImplEntry(long index, long term, Object payload) {
 
             this.index = index;
             this.term = term;
@@ -309,4 +494,53 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
+    private static class Snapshot implements Serializable {
+        private final Object state;
+        private final List<ReplicatedLogEntry> unAppliedEntries;
+        private final long lastIndex;
+        private final long lastTerm;
+        private final long lastAppliedIndex;
+        private final long lastAppliedTerm;
+
+        private Snapshot(Object state,
+            List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+            long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+            this.state = state;
+            this.unAppliedEntries = unAppliedEntries;
+            this.lastIndex = lastIndex;
+            this.lastTerm = lastTerm;
+            this.lastAppliedIndex = lastAppliedIndex;
+            this.lastAppliedTerm = lastAppliedTerm;
+        }
+
+
+        public static Snapshot create(Object state,
+            List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm) {
+            return new Snapshot(state, entries, lastIndex, lastTerm,
+                lastAppliedIndex, lastAppliedTerm);
+        }
+
+        public Object getState() {
+            return state;
+        }
+
+        public List<ReplicatedLogEntry> getUnAppliedEntries() {
+            return unAppliedEntries;
+        }
+
+        public long getLastTerm() {
+            return lastTerm;
+        }
+
+        public long getLastAppliedIndex() {
+            return lastAppliedIndex;
+        }
+
+        public long getLastAppliedTerm() {
+            return lastAppliedTerm;
+        }
+    }
+
+
 }
index 34e7ac9..3de0de5 100644 (file)
@@ -17,8 +17,9 @@ public interface ReplicatedLog {
     /**
      * Get a replicated log entry at the specified index
      *
-     * @param index
-     * @return
+     * @param index the index of the log entry
+     * @return the ReplicatedLogEntry at index. null if index less than 0 or
+     * greater than the size of the in-memory journal.
      */
     ReplicatedLogEntry get(long index);
 
@@ -45,7 +46,7 @@ public interface ReplicatedLog {
     /**
      * Remove all the entries from the logs >= index
      *
-     * @param index
+     * @param index the index of the log entry
      */
     void removeFrom(long index);
 
@@ -63,7 +64,7 @@ public interface ReplicatedLog {
 
     /**
      *
-     * @param index
+     * @param index the index of the log entry
      */
     List<ReplicatedLogEntry> getFrom(long index);
 
@@ -73,4 +74,43 @@ public interface ReplicatedLog {
      * @return
      */
     long size();
+
+    /**
+     * Checks if the entry at the specified index is present or not
+     *
+     * @param index the index of the log entry
+     * @return true if the entry is present in the in-memory journal
+     */
+    boolean isPresent(long index);
+
+    /**
+     * Checks if the entry is present in a snapshot
+     *
+     * @param index the index of the log entry
+     * @return true if the entry is in the snapshot. false if the entry is not
+     * in the snapshot even if the entry may be present in the replicated log
+     */
+    boolean isInSnapshot(long index);
+
+    /**
+     * Get the snapshot
+     *
+     * @return an object representing the snapshot if it exists. null otherwise
+     */
+    Object getSnapshot();
+
+    /**
+     * Get the index of the snapshot
+     *
+     * @return the index from which the snapshot was created. -1 otherwise.
+     */
+    long getSnapshotIndex();
+
+    /**
+     * Get the term of the snapshot
+     *
+     * @return the term of the index from which the snapshot was created. -1
+     * otherwise
+     */
+    long getSnapshotTerm();
 }
index 88558ca..1cfc2e0 100644 (file)
@@ -12,9 +12,11 @@ import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
 /**
@@ -165,6 +167,9 @@ public class Follower extends AbstractRaftActorBehavior {
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
         if(message instanceof ElectionTimeout){
             return RaftState.Candidate;
+        } else if(message instanceof InstallSnapshot){
+            InstallSnapshot snapshot = (InstallSnapshot) message;
+            actor().tell(new ApplySnapshot(snapshot), actor());
         }
 
         scheduleElection(electionDuration());
index ec4551a..90edf7d 100644 (file)
@@ -22,12 +22,16 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +68,9 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private final Map<String, ActorSelection> followerToActor = new HashMap<>();
 
-    private Cancellable heartbeatCancel = null;
+    private Cancellable heartbeatSchedule = null;
+    private Cancellable appendEntriesSchedule = null;
+    private Cancellable installSnapshotSchedule = null;
 
     private List<ClientRequestTracker> trackerList = new ArrayList<>();
 
@@ -103,6 +109,10 @@ public class Leader extends AbstractRaftActorBehavior {
         // prevent election timeouts (ยง5.2)
         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
 
+        scheduleInstallSnapshotCheck(
+            new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
+                HEART_BEAT_INTERVAL.unit())
+        );
 
     }
 
@@ -194,8 +204,14 @@ public class Leader extends AbstractRaftActorBehavior {
         try {
             if (message instanceof SendHeartBeat) {
                 return sendHeartBeat();
+            } else if(message instanceof SendInstallSnapshot) {
+                installSnapshotIfNeeded();
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
+            } else if (message instanceof InstallSnapshotReply){
+                // FIXME : Should I be checking the term here too?
+                handleInstallSnapshotReply(
+                    (InstallSnapshotReply) message);
             }
         } finally {
             scheduleHeartBeat(HEART_BEAT_INTERVAL);
@@ -204,6 +220,18 @@ public class Leader extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshotReply(InstallSnapshotReply message) {
+        InstallSnapshotReply reply = message;
+        String followerId = reply.getFollowerId();
+        FollowerLogInformation followerLogInformation =
+            followerToLog.get(followerId);
+
+        followerLogInformation
+            .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
+        followerLogInformation
+            .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+    }
+
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
@@ -244,8 +272,12 @@ public class Leader extends AbstractRaftActorBehavior {
 
             long nextIndex = followerLogInformation.getNextIndex().get();
 
-            List<ReplicatedLogEntry> entries =
-                context.getReplicatedLog().getFrom(nextIndex);
+            List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+            if(context.getReplicatedLog().isPresent(nextIndex)){
+                entries =
+                    context.getReplicatedLog().getFrom(nextIndex);
+            }
 
             followerActor.tell(
                 new AppendEntries(currentTerm(), context.getId(),
@@ -257,6 +289,29 @@ public class Leader extends AbstractRaftActorBehavior {
         }
     }
 
+    private void installSnapshotIfNeeded(){
+        for (String followerId : followerToActor.keySet()) {
+            ActorSelection followerActor =
+                followerToActor.get(followerId);
+
+            FollowerLogInformation followerLogInformation =
+                followerToLog.get(followerId);
+
+            long nextIndex = followerLogInformation.getNextIndex().get();
+
+            if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm(),
+                        context.getReplicatedLog().getSnapshot()
+                    ),
+                    actor()
+                );
+            }
+        }
+    }
+
     private RaftState sendHeartBeat() {
         if (followerToActor.size() > 0) {
             sendAppendEntries();
@@ -265,8 +320,14 @@ public class Leader extends AbstractRaftActorBehavior {
     }
 
     private void stopHeartBeat() {
-        if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
-            heartbeatCancel.cancel();
+        if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+            heartbeatSchedule.cancel();
+        }
+    }
+
+    private void stopInstallSnapshotSchedule() {
+        if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+            installSnapshotSchedule.cancel();
         }
     }
 
@@ -284,13 +345,34 @@ public class Leader extends AbstractRaftActorBehavior {
         // Scheduling the heartbeat only once here because heartbeats do not
         // need to be sent if there are other messages being sent to the remote
         // actor.
-        heartbeatCancel =
+        heartbeatSchedule =
             context.getActorSystem().scheduler().scheduleOnce(
                 interval,
                 context.getActor(), new SendHeartBeat(),
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
+
+    private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+        if(followerToActor.keySet().size() == 0){
+            // Optimization - do not bother scheduling a heartbeat as there are
+            // no followers
+            return;
+        }
+
+        stopInstallSnapshotSchedule();
+
+        // Schedule a message to send append entries to followers that can
+        // accept an append entries with some data in it
+        installSnapshotSchedule =
+            context.getActorSystem().scheduler().scheduleOnce(
+                interval,
+                context.getActor(), new SendInstallSnapshot(),
+                context.getActorSystem().dispatcher(), context.getActor());
+    }
+
+
+
     @Override public void close() throws Exception {
         stopHeartBeat();
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java
new file mode 100644 (file)
index 0000000..a7172e2
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+public class ApplySnapshot {
+    private final Object snapshot;
+
+    public ApplySnapshot(Object snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    public Object getSnapshot() {
+        return snapshot;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java
new file mode 100644 (file)
index 0000000..0c370aa
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+public class SendInstallSnapshot {
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
new file mode 100644 (file)
index 0000000..888854f
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public class InstallSnapshot extends AbstractRaftRPC {
+
+    private final String leaderId;
+    private final long lastIncludedIndex;
+    private final long lastIncludedTerm;
+    private final Object data;
+
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) {
+        super(term);
+        this.leaderId = leaderId;
+        this.lastIncludedIndex = lastIncludedIndex;
+        this.lastIncludedTerm = lastIncludedTerm;
+        this.data = data;
+    }
+
+    public String getLeaderId() {
+        return leaderId;
+    }
+
+    public long getLastIncludedIndex() {
+        return lastIncludedIndex;
+    }
+
+    public long getLastIncludedTerm() {
+        return lastIncludedTerm;
+    }
+
+    public Object getData() {
+        return data;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
new file mode 100644 (file)
index 0000000..85b89b7
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public class InstallSnapshotReply extends AbstractRaftRPC {
+
+    // The followerId - this will be used to figure out which follower is
+    // responding
+    private final String followerId;
+
+    protected InstallSnapshotReply(long term, String followerId) {
+        super(term);
+        this.followerId = followerId;
+    }
+
+    public String getFollowerId() {
+        return followerId;
+    }
+}
index addf51a..b9c1278 100644 (file)
@@ -50,10 +50,8 @@ public class MockRaftActorContext implements RaftActorContext {
 
 
     public void initReplicatedLog(){
-        MockReplicatedLog mockReplicatedLog = new MockReplicatedLog();
-        this.replicatedLog = mockReplicatedLog;
-        mockReplicatedLog.setLast(new MockReplicatedLogEntry(1,1,""));
-        mockReplicatedLog.setReplicatedLogEntry(new MockReplicatedLogEntry(1,1, ""));
+        this.replicatedLog = new SimpleReplicatedLog();
+        this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, ""));
     }
 
     @Override public ActorRef actorOf(Props props) {
@@ -125,57 +123,6 @@ public class MockRaftActorContext implements RaftActorContext {
     }
 
 
-    public static class MockReplicatedLog implements ReplicatedLog {
-        private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, "");
-        private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, "");
-
-        @Override public ReplicatedLogEntry get(long index) {
-            return replicatedLogEntry;
-        }
-
-        @Override public ReplicatedLogEntry last() {
-            return last;
-        }
-
-        @Override public long lastIndex() {
-            return last.getIndex();
-        }
-
-        @Override public long lastTerm() {
-            return last.getTerm();
-        }
-
-        @Override public void removeFrom(long index) {
-        }
-
-        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
-        }
-
-        @Override public void appendAndPersist(
-            ReplicatedLogEntry replicatedLogEntry) {
-        }
-
-        @Override public List<ReplicatedLogEntry> getFrom(long index) {
-            return Collections.EMPTY_LIST;
-        }
-
-        @Override public long size() {
-            if(replicatedLogEntry != null){
-                return 1;
-            }
-            return 0;
-        }
-
-        public void setReplicatedLogEntry(
-            ReplicatedLogEntry replicatedLogEntry) {
-            this.replicatedLogEntry = replicatedLogEntry;
-        }
-
-        public void setLast(ReplicatedLogEntry last) {
-            this.last = last;
-        }
-    }
-
     public static class SimpleReplicatedLog implements ReplicatedLog {
         private final List<ReplicatedLogEntry> log = new ArrayList<>(10000);
 
@@ -210,6 +157,9 @@ public class MockRaftActorContext implements RaftActorContext {
         }
 
         @Override public void removeFrom(long index) {
+            if(index >= log.size() || index < 0){
+                return;
+            }
             for(int i=(int) index ; i < log.size() ; i++) {
                 log.remove(i);
             }
@@ -225,6 +175,9 @@ public class MockRaftActorContext implements RaftActorContext {
         }
 
         @Override public List<ReplicatedLogEntry> getFrom(long index) {
+            if(index >= log.size() || index < 0){
+                return Collections.EMPTY_LIST;
+            }
             List<ReplicatedLogEntry> entries = new ArrayList<>();
             for(int i=(int) index ; i < log.size() ; i++) {
                 entries.add(get(i));
@@ -235,6 +188,30 @@ public class MockRaftActorContext implements RaftActorContext {
         @Override public long size() {
             return log.size();
         }
+
+        @Override public boolean isPresent(long index) {
+            if(index >= log.size() || index < 0){
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override public boolean isInSnapshot(long index) {
+            return false;
+        }
+
+        @Override public Object getSnapshot() {
+            return null;
+        }
+
+        @Override public long getSnapshotIndex() {
+            return -1;
+        }
+
+        @Override public long getSnapshotTerm() {
+            return -1;
+        }
     }
 
     public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
index 273342e..a7b6825 100644 (file)
@@ -211,15 +211,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                     RaftActorContext actorContext =
                         createActorContext(behaviorActor);
 
-                    MockRaftActorContext.MockReplicatedLog
-                        log = new MockRaftActorContext.MockReplicatedLog();
-                    log.setReplicatedLogEntry(
+                    MockRaftActorContext.SimpleReplicatedLog
+                        log = new MockRaftActorContext.SimpleReplicatedLog();
+                    log.append(
                         new MockRaftActorContext.MockReplicatedLogEntry(20000,
                             1000000, ""));
-                    log.setLast(
-                        new MockRaftActorContext.MockReplicatedLogEntry(20000,
-                            1000000, "")
-                    );
 
                     ((MockRaftActorContext) actorContext).setReplicatedLog(log);
 
@@ -307,18 +303,17 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         assertEquals(RaftState.Follower, raftState);
     }
 
-    protected MockRaftActorContext.MockReplicatedLog setLastLogEntry(
+    protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
         MockRaftActorContext actorContext, long term, long index, Object data) {
         return setLastLogEntry(actorContext,
             new MockRaftActorContext.MockReplicatedLogEntry(term, index, data));
     }
 
-    protected MockRaftActorContext.MockReplicatedLog setLastLogEntry(
+    protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
         MockRaftActorContext actorContext, ReplicatedLogEntry logEntry) {
-        MockRaftActorContext.MockReplicatedLog
-            log = new MockRaftActorContext.MockReplicatedLog();
-        // By default MockReplicateLog has last entry set to (1,1,"")
-        log.setLast(logEntry);
+        MockRaftActorContext.SimpleReplicatedLog
+            log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(logEntry);
         actorContext.setReplicatedLog(log);
 
         return log;
index ca0e13d..6774424 100644 (file)
@@ -190,14 +190,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
             // Set the last log entry term for the receiver to be greater than
             // what we will be sending as the prevLogTerm in AppendEntries
-            MockRaftActorContext.MockReplicatedLog mockReplicatedLog =
+            MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
                 setLastLogEntry(context, 20, 0, "");
 
-            // Also set the entry at index 0 with term 20 which will be greater
-            // than the prevLogTerm sent by the sender
-            mockReplicatedLog.setReplicatedLogEntry(
-                new MockRaftActorContext.MockReplicatedLogEntry(20, 0, ""));
-
             // AppendEntries is now sent with a bigger term
             // this will set the receivers term to be the same as the sender's term
             AppendEntries appendEntries =

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.