From 5924885ac74b5fa0c729004a5b66b30654a55496 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Wed, 16 Jul 2014 09:17:32 -0700 Subject: [PATCH] Initial implementation of saving and installing snapshots This commit basically sketches out how creating and installing snapshots would fit into the overall design. This has not been tested yet. Change-Id: I0c575049e8dbda6b63e49dab80ff0980546029d3 Signed-off-by: Moiz Raja --- .../cluster/example/ExampleActor.java | 9 + .../controller/cluster/example/Main.java | 36 +- .../controller/cluster/raft/RaftActor.java | 402 ++++++++++++++---- .../cluster/raft/ReplicatedLog.java | 48 ++- .../cluster/raft/behaviors/Follower.java | 5 + .../cluster/raft/behaviors/Leader.java | 94 +++- .../raft/internal/messages/ApplySnapshot.java | 21 + .../messages/SendInstallSnapshot.java | 12 + .../raft/messages/InstallSnapshot.java | 41 ++ .../raft/messages/InstallSnapshotReply.java | 25 ++ .../cluster/raft/MockRaftActorContext.java | 87 ++-- .../AbstractRaftActorBehaviorTest.java | 21 +- .../cluster/raft/behaviors/FollowerTest.java | 7 +- 13 files changed, 638 insertions(+), 170 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 35a2c98bd4..90cc247709 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -65,6 +65,15 @@ public class ExampleActor extends RaftActor { } } + @Override protected Object createSnapshot() { + return state; + } + + @Override protected void applySnapshot(Object snapshot) { + state.clear(); + state.putAll((HashMap) snapshot); + } + @Override public void onReceiveRecover(Object message) { super.onReceiveRecover(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java index 27d083f2b3..a148ed4009 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java @@ -10,11 +10,14 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import org.opendaylight.controller.cluster.example.messages.KeyValue; import java.io.BufferedReader; import java.io.InputStreamReader; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; public class Main { @@ -41,14 +44,43 @@ public class Main { actorSystem.actorOf(ExampleActor.props("example-3", withoutPeer("example-3")), "example-3"); + + List examples = Arrays.asList(example1Actor, example2Actor, example3Actor); + ActorRef clientActor = actorSystem.actorOf(ClientActor.props(example1Actor)); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + System.out.println("Usage :"); + System.out.println("s <1-3> to start a peer"); + System.out.println("k <1-3> to kill a peer"); + while(true) { - System.out.print("Enter Integer (0 to exit):"); + System.out.print("Enter command (0 to exit):"); try { - int i = Integer.parseInt(br.readLine()); + String s = br.readLine(); + String[] split = s.split(" "); + if(split.length > 1) { + String command = split[0]; + String actor = split[1]; + + if ("k".equals(command)) { + int i = Integer.parseInt(actor); + examples.get(i - 1) + .tell(PoisonPill.getInstance(), null); + continue; + } else if ("s".equals(command)) { + int i = Integer.parseInt(actor); + String actorName = "example-" + i; + examples.add(i - 1, + actorSystem.actorOf(ExampleActor.props(actorName, + withoutPeer(actorName)), actorName)); + System.out.println("Created actor : " + actorName); + continue; + } + } + + int i = Integer.parseInt(s); if(i == 0){ System.exit(0); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 826faf7414..f38ef18973 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -14,6 +14,10 @@ import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -21,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; @@ -33,50 +38,48 @@ import java.util.Map; * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper * - * In Search of an Understandable Consensus Algorithm - *

+ * In Search of an Understandable Consensus Algorithm + *

* RaftActor has 3 states and each state has a certain behavior associated * with it. A Raft actor can behave as, *

    - *
  • A Leader
  • - *
  • A Follower (or)
  • - *
  • A Candidate
  • + *
  • A Leader
  • + *
  • A Follower (or)
  • + *
  • A Candidate
  • *
- * - *

+ *

+ *

* A RaftActor MUST be a Leader in order to accept requests from clients to * change the state of it's encapsulated state machine. Once a RaftActor becomes * a Leader it is also responsible for ensuring that all followers ultimately * have the same log and therefore the same state machine as itself. - * - *

+ *

+ *

* The current behavior of a RaftActor determines how election for leadership * is initiated and how peer RaftActors react to request for votes. - * - *

+ *

+ *

* Each RaftActor also needs to know the current election term. It uses this * information for a couple of things. One is to simply figure out who it * voted for in the last election. Another is to figure out if the message * it received to update it's state is stale. - * - *

+ *

+ *

* The RaftActor uses akka-persistence to store it's replicated log. * Furthermore through it's behaviors a Raft Actor determines - * + *

*

    *
  • when a log entry should be persisted
  • *
  • when a log entry should be applied to the state machine (and)
  • *
  • when a snapshot should be saved
  • *
- * - * UntypeEventSourceProcessor */ public abstract class RaftActor extends UntypedPersistentActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); /** - * The current state determines the current behavior of a RaftActor + * The current state determines the current behavior of a RaftActor * A Raft Actor always starts off in the Follower State */ private RaftActorBehavior currentBehavior; @@ -93,36 +96,61 @@ public abstract class RaftActor extends UntypedPersistentActor { private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); - - public RaftActor(String id, Map peerAddresses){ + public RaftActor(String id, Map peerAddresses) { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(getSelf().path().toString()), -1, -1, replicatedLog, peerAddresses, LOG); - currentBehavior = switchBehavior(RaftState.Follower); } @Override public void onReceiveRecover(Object message) { - if(message instanceof ReplicatedLogEntry) { + if (message instanceof SnapshotOffer) { + SnapshotOffer offer = (SnapshotOffer) message; + Snapshot snapshot = (Snapshot) offer.snapshot(); + + // Create a replicated log with the snapshot information + // The replicated log can be used later on to retrieve this snapshot + // when we need to install it on a peer + replicatedLog = new ReplicatedLogImpl(snapshot); + + // Apply the snapshot to the actors state + applySnapshot(snapshot.getState()); + + } else if (message instanceof ReplicatedLogEntry) { replicatedLog.append((ReplicatedLogEntry) message); - } else if(message instanceof RecoveryCompleted){ - LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex()); + } else if (message instanceof RecoveryCompleted) { + LOG.debug( + "Last index in log : " + replicatedLog.lastIndex()); + currentBehavior = switchBehavior(RaftState.Follower); } } @Override public void onReceiveCommand(Object message) { - if(message instanceof ApplyState){ + if (message instanceof ApplyState) { - ApplyState applyState = (ApplyState) message; + ApplyState applyState = (ApplyState) message; - LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex()); + LOG.debug("Applying state for log index {}", + applyState.getReplicatedLogEntry().getIndex()); applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); - } else if(message instanceof FindLeader){ - getSender().tell(new FindLeaderReply( - context.getPeerAddress(currentBehavior.getLeaderId())), - getSelf()); + } else if(message instanceof ApplySnapshot ) { + applySnapshot(((ApplySnapshot) message).getSnapshot()); + } else if (message instanceof FindLeader) { + getSender().tell( + new FindLeaderReply( + context.getPeerAddress(currentBehavior.getLeaderId())), + getSelf() + ); + } else if (message instanceof SaveSnapshotSuccess) { + SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; + + // TODO: Not sure if we want to be this aggressive with trimming stuff + trimPersistentData(success.metadata().sequenceNr()); + + } else if (message instanceof SaveSnapshotFailure) { + // TODO: Handle failure in saving the snapshot } else { RaftState state = currentBehavior.handleMessage(getSender(), message); @@ -130,33 +158,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } } - private RaftActorBehavior switchBehavior(RaftState state){ - if(currentBehavior != null) { - if (currentBehavior.state() == state) { - return currentBehavior; - } - LOG.info("Switching from state " + currentBehavior.state() + " to " - + state); - - try { - currentBehavior.close(); - } catch (Exception e) { - LOG.error(e, "Failed to close behavior : " + currentBehavior.state()); - } - } else { - LOG.info("Switching behavior to " + state); - } - RaftActorBehavior behavior = null; - if(state == RaftState.Candidate){ - behavior = new Candidate(context); - } else if(state == RaftState.Follower){ - behavior = new Follower(context); - } else { - behavior = new Leader(context); - } - return behavior; - } /** * When a derived RaftActor needs to persist something it must call @@ -166,55 +168,170 @@ public abstract class RaftActor extends UntypedPersistentActor { * @param identifier * @param data */ - protected void persistData(ActorRef clientActor, String identifier, Object data){ + protected void persistData(ActorRef clientActor, String identifier, + Object data) { LOG.debug("Persist data " + identifier); ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); - replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry); + replicatedLog + .appendAndPersist(clientActor, identifier, replicatedLogEntry); } - protected abstract void applyState(ActorRef clientActor, String identifier, Object data); - - protected String getId(){ + protected String getId() { return context.getId(); } - protected boolean isLeader(){ + /** + * Derived actors can call the isLeader method to check if the current + * RaftActor is the Leader or not + * + * @return true it this RaftActor is a Leader false otherwise + */ + protected boolean isLeader() { return context.getId().equals(currentBehavior.getLeaderId()); } - protected ActorSelection getLeader(){ + /** + * Derived actor can call getLeader if they need a reference to the Leader. + * This would be useful for example in forwarding a request to an actor + * which is the leader + * + * @return A reference to the leader if known, null otherwise + */ + protected ActorSelection getLeader() { String leaderId = currentBehavior.getLeaderId(); + if (leaderId == null) { + return null; + } String peerAddress = context.getPeerAddress(leaderId); - LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress); + LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + + peerAddress); return context.actorSelection(peerAddress); } + /** + * The applyState method will be called by the RaftActor when some data + * needs to be applied to the actor's state + * + * @param clientActor A reference to the client who sent this message. This + * is the same reference that was passed to persistData + * by the derived actor. clientActor may be null when + * the RaftActor is behaving as a follower or during + * recovery. + * @param identifier The identifier of the persisted data. This is also + * the same identifier that was passed to persistData by + * the derived actor. identifier may be null when + * the RaftActor is behaving as a follower or during + * recovery + * @param data A piece of data that was persisted by the persistData call. + * This should NEVER be null. + */ + protected abstract void applyState(ActorRef clientActor, String identifier, + Object data); + + /** + * This method will be called by the RaftActor when a snapshot needs to be + * created. The derived actor should respond with its current state. + *

+ * During recovery the state that is returned by the derived actor will + * be passed back to it by calling the applySnapshot method + * + * @return The current state of the actor + */ + protected abstract Object createSnapshot(); + + /** + * This method will be called by the RaftActor during recovery to + * reconstruct the state of the actor. + *

+ * This method may also be called at any other point during normal + * operations when the derived actor is out of sync with it's peers + * and the only way to bring it in sync is by applying a snapshot + * + * @param snapshot A snapshot of the state of the actor + */ + protected abstract void applySnapshot(Object snapshot); + + private RaftActorBehavior switchBehavior(RaftState state) { + if (currentBehavior != null) { + if (currentBehavior.state() == state) { + return currentBehavior; + } + LOG.info("Switching from state " + currentBehavior.state() + " to " + + state); + + try { + currentBehavior.close(); + } catch (Exception e) { + LOG.error(e, + "Failed to close behavior : " + currentBehavior.state()); + } + + } else { + LOG.info("Switching behavior to " + state); + } + RaftActorBehavior behavior = null; + if (state == RaftState.Candidate) { + behavior = new Candidate(context); + } else if (state == RaftState.Follower) { + behavior = new Follower(context); + } else { + behavior = new Leader(context); + } + return behavior; + } + + private void trimPersistentData(long sequenceNumber) { + // Trim snapshots + // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied + // For now guessing that it is ANDed. + deleteSnapshots(new SnapshotSelectionCriteria( + sequenceNumber - 100000, 43200000)); + + // Trim journal + deleteMessages(sequenceNumber); + } + private class ReplicatedLogImpl implements ReplicatedLog { - private final List journal = new ArrayList(); - private long snapshotIndex = 0; - private Object snapShot = null; + private final List journal; + private final Object snapshot; + private long snapshotIndex = -1; + private long snapshotTerm = -1; + + public ReplicatedLogImpl(Snapshot snapshot) { + this.snapshot = snapshot.getState(); + this.snapshotIndex = snapshot.getLastAppliedIndex(); + this.snapshotTerm = snapshot.getLastAppliedTerm(); + + this.journal = new ArrayList<>(snapshot.getUnAppliedEntries()); + } + public ReplicatedLogImpl() { + this.snapshot = null; + this.journal = new ArrayList<>(); + } @Override public ReplicatedLogEntry get(long index) { - if(index < 0 || index >= journal.size()){ + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { return null; } - return journal.get((int) (index - snapshotIndex)); + return journal.get(adjustedIndex); } @Override public ReplicatedLogEntry last() { - if(journal.size() == 0){ + if (journal.size() == 0) { return null; } return get(journal.size() - 1); } @Override public long lastIndex() { - if(journal.size() == 0){ + if (journal.size() == 0) { return -1; } @@ -222,7 +339,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } @Override public long lastTerm() { - if(journal.size() == 0){ + if (journal.size() == 0) { return -1; } @@ -231,56 +348,124 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void removeFrom(long index) { - if(index < 0 || index >= journal.size()){ + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { return; } - for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){ + for (int i = adjustedIndex; + i < journal.size(); i++) { deleteMessage(i); journal.remove(i); } } - @Override public void append(final ReplicatedLogEntry replicatedLogEntry) { + @Override public void append( + final ReplicatedLogEntry replicatedLogEntry) { journal.add(replicatedLogEntry); } @Override public List getFrom(long index) { + int adjustedIndex = adjustedIndex(index); + List entries = new ArrayList<>(100); - if(index < 0 || index >= journal.size()){ + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { return entries; } - for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){ + for (int i = adjustedIndex; + i < journal.size(); i++) { entries.add(journal.get(i)); } return entries; } - @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){ + @Override public void appendAndPersist( + final ReplicatedLogEntry replicatedLogEntry) { appendAndPersist(null, null, replicatedLogEntry); } - public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){ - context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex()); + public void appendAndPersist(final ActorRef clientActor, + final String identifier, + final ReplicatedLogEntry replicatedLogEntry) { + context.getLogger().debug( + "Append log entry and persist " + replicatedLogEntry.getIndex()); // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs journal.add(replicatedLogEntry); + + // When persisting events with persist it is guaranteed that the + // persistent actor will not receive further commands between the + // persist call and the execution(s) of the associated event + // handler. This also holds for multiple persist calls in context + // of a single command. persist(replicatedLogEntry, new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { + // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. + if (size() > 100000) { + ReplicatedLogEntry lastAppliedEntry = + get(context.getLastApplied()); + long lastAppliedIndex = -1; + long lastAppliedTerm = -1; + if (lastAppliedEntry != null) { + lastAppliedIndex = lastAppliedEntry.getIndex(); + lastAppliedTerm = lastAppliedEntry.getTerm(); + } + + saveSnapshot(Snapshot.create(createSnapshot(), + getFrom(context.getLastApplied() + 1), + lastIndex(), lastTerm(), lastAppliedIndex, + lastAppliedTerm)); + } // Send message for replication - if(clientActor != null) { + if (clientActor != null) { currentBehavior.handleMessage(getSelf(), new Replicate(clientActor, identifier, - replicatedLogEntry)); + replicatedLogEntry) + ); } } - }); + } + ); } @Override public long size() { return journal.size() + snapshotIndex; } + + @Override public boolean isPresent(long index) { + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + return false; + } + return true; + } + + @Override public boolean isInSnapshot(long index) { + return index <= snapshotIndex; + } + + @Override public Object getSnapshot() { + return snapshot; + } + + @Override public long getSnapshotIndex() { + return snapshotIndex; + } + + @Override public long getSnapshotTerm() { + return snapshotTerm; + } + + private int adjustedIndex(long index) { + if(snapshotIndex < 0){ + return (int) index; + } + return (int) (index - snapshotIndex); + } } + private static class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable { @@ -288,7 +473,7 @@ public abstract class RaftActor extends UntypedPersistentActor { private final long term; private final Object payload; - public ReplicatedLogImplEntry(long index, long term, Object payload){ + public ReplicatedLogImplEntry(long index, long term, Object payload) { this.index = index; this.term = term; @@ -309,4 +494,53 @@ public abstract class RaftActor extends UntypedPersistentActor { } + private static class Snapshot implements Serializable { + private final Object state; + private final List unAppliedEntries; + private final long lastIndex; + private final long lastTerm; + private final long lastAppliedIndex; + private final long lastAppliedTerm; + + private Snapshot(Object state, + List unAppliedEntries, long lastIndex, + long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { + this.state = state; + this.unAppliedEntries = unAppliedEntries; + this.lastIndex = lastIndex; + this.lastTerm = lastTerm; + this.lastAppliedIndex = lastAppliedIndex; + this.lastAppliedTerm = lastAppliedTerm; + } + + + public static Snapshot create(Object state, + List entries, long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm) { + return new Snapshot(state, entries, lastIndex, lastTerm, + lastAppliedIndex, lastAppliedTerm); + } + + public Object getState() { + return state; + } + + public List getUnAppliedEntries() { + return unAppliedEntries; + } + + public long getLastTerm() { + return lastTerm; + } + + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 34e7ac9768..3de0de5131 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -17,8 +17,9 @@ public interface ReplicatedLog { /** * Get a replicated log entry at the specified index * - * @param index - * @return + * @param index the index of the log entry + * @return the ReplicatedLogEntry at index. null if index less than 0 or + * greater than the size of the in-memory journal. */ ReplicatedLogEntry get(long index); @@ -45,7 +46,7 @@ public interface ReplicatedLog { /** * Remove all the entries from the logs >= index * - * @param index + * @param index the index of the log entry */ void removeFrom(long index); @@ -63,7 +64,7 @@ public interface ReplicatedLog { /** * - * @param index + * @param index the index of the log entry */ List getFrom(long index); @@ -73,4 +74,43 @@ public interface ReplicatedLog { * @return */ long size(); + + /** + * Checks if the entry at the specified index is present or not + * + * @param index the index of the log entry + * @return true if the entry is present in the in-memory journal + */ + boolean isPresent(long index); + + /** + * Checks if the entry is present in a snapshot + * + * @param index the index of the log entry + * @return true if the entry is in the snapshot. false if the entry is not + * in the snapshot even if the entry may be present in the replicated log + */ + boolean isInSnapshot(long index); + + /** + * Get the snapshot + * + * @return an object representing the snapshot if it exists. null otherwise + */ + Object getSnapshot(); + + /** + * Get the index of the snapshot + * + * @return the index from which the snapshot was created. -1 otherwise. + */ + long getSnapshotIndex(); + + /** + * Get the term of the snapshot + * + * @return the term of the index from which the snapshot was created. -1 + * otherwise + */ + long getSnapshotTerm(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 88558cac16..1cfc2e0eb9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -12,9 +12,11 @@ import akka.actor.ActorRef; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; /** @@ -165,6 +167,9 @@ public class Follower extends AbstractRaftActorBehavior { @Override public RaftState handleMessage(ActorRef sender, Object message) { if(message instanceof ElectionTimeout){ return RaftState.Candidate; + } else if(message instanceof InstallSnapshot){ + InstallSnapshot snapshot = (InstallSnapshot) message; + actor().tell(new ApplySnapshot(snapshot), actor()); } scheduleElection(electionDuration()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index ec4551ac4f..90edf7da9a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -22,12 +22,16 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,7 +68,9 @@ public class Leader extends AbstractRaftActorBehavior { private final Map followerToActor = new HashMap<>(); - private Cancellable heartbeatCancel = null; + private Cancellable heartbeatSchedule = null; + private Cancellable appendEntriesSchedule = null; + private Cancellable installSnapshotSchedule = null; private List trackerList = new ArrayList<>(); @@ -103,6 +109,10 @@ public class Leader extends AbstractRaftActorBehavior { // prevent election timeouts (§5.2) scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + scheduleInstallSnapshotCheck( + new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, + HEART_BEAT_INTERVAL.unit()) + ); } @@ -194,8 +204,14 @@ public class Leader extends AbstractRaftActorBehavior { try { if (message instanceof SendHeartBeat) { return sendHeartBeat(); + } else if(message instanceof SendInstallSnapshot) { + installSnapshotIfNeeded(); } else if (message instanceof Replicate) { replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply){ + // FIXME : Should I be checking the term here too? + handleInstallSnapshotReply( + (InstallSnapshotReply) message); } } finally { scheduleHeartBeat(HEART_BEAT_INTERVAL); @@ -204,6 +220,18 @@ public class Leader extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } + private void handleInstallSnapshotReply(InstallSnapshotReply message) { + InstallSnapshotReply reply = message; + String followerId = reply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + followerLogInformation + .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation + .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } + private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); @@ -244,8 +272,12 @@ public class Leader extends AbstractRaftActorBehavior { long nextIndex = followerLogInformation.getNextIndex().get(); - List entries = - context.getReplicatedLog().getFrom(nextIndex); + List entries = Collections.emptyList(); + + if(context.getReplicatedLog().isPresent(nextIndex)){ + entries = + context.getReplicatedLog().getFrom(nextIndex); + } followerActor.tell( new AppendEntries(currentTerm(), context.getId(), @@ -257,6 +289,29 @@ public class Leader extends AbstractRaftActorBehavior { } } + private void installSnapshotIfNeeded(){ + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){ + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().getSnapshot() + ), + actor() + ); + } + } + } + private RaftState sendHeartBeat() { if (followerToActor.size() > 0) { sendAppendEntries(); @@ -265,8 +320,14 @@ public class Leader extends AbstractRaftActorBehavior { } private void stopHeartBeat() { - if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) { - heartbeatCancel.cancel(); + if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) { + heartbeatSchedule.cancel(); + } + } + + private void stopInstallSnapshotSchedule() { + if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { + installSnapshotSchedule.cancel(); } } @@ -284,13 +345,34 @@ public class Leader extends AbstractRaftActorBehavior { // Scheduling the heartbeat only once here because heartbeats do not // need to be sent if there are other messages being sent to the remote // actor. - heartbeatCancel = + heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( interval, context.getActor(), new SendHeartBeat(), context.getActorSystem().dispatcher(), context.getActor()); } + + private void scheduleInstallSnapshotCheck(FiniteDuration interval) { + if(followerToActor.keySet().size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } + + stopInstallSnapshotSchedule(); + + // Schedule a message to send append entries to followers that can + // accept an append entries with some data in it + installSnapshotSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, + context.getActor(), new SendInstallSnapshot(), + context.getActorSystem().dispatcher(), context.getActor()); + } + + + @Override public void close() throws Exception { stopHeartBeat(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java new file mode 100644 index 0000000000..a7172e22c8 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +public class ApplySnapshot { + private final Object snapshot; + + public ApplySnapshot(Object snapshot) { + this.snapshot = snapshot; + } + + public Object getSnapshot() { + return snapshot; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java new file mode 100644 index 0000000000..0c370aa981 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +public class SendInstallSnapshot { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java new file mode 100644 index 0000000000..888854fa71 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public class InstallSnapshot extends AbstractRaftRPC { + + private final String leaderId; + private final long lastIncludedIndex; + private final long lastIncludedTerm; + private final Object data; + + public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) { + super(term); + this.leaderId = leaderId; + this.lastIncludedIndex = lastIncludedIndex; + this.lastIncludedTerm = lastIncludedTerm; + this.data = data; + } + + public String getLeaderId() { + return leaderId; + } + + public long getLastIncludedIndex() { + return lastIncludedIndex; + } + + public long getLastIncludedTerm() { + return lastIncludedTerm; + } + + public Object getData() { + return data; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java new file mode 100644 index 0000000000..85b89b70ae --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public class InstallSnapshotReply extends AbstractRaftRPC { + + // The followerId - this will be used to figure out which follower is + // responding + private final String followerId; + + protected InstallSnapshotReply(long term, String followerId) { + super(term); + this.followerId = followerId; + } + + public String getFollowerId() { + return followerId; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index addf51a63c..b9c1278b8e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -50,10 +50,8 @@ public class MockRaftActorContext implements RaftActorContext { public void initReplicatedLog(){ - MockReplicatedLog mockReplicatedLog = new MockReplicatedLog(); - this.replicatedLog = mockReplicatedLog; - mockReplicatedLog.setLast(new MockReplicatedLogEntry(1,1,"")); - mockReplicatedLog.setReplicatedLogEntry(new MockReplicatedLogEntry(1,1, "")); + this.replicatedLog = new SimpleReplicatedLog(); + this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, "")); } @Override public ActorRef actorOf(Props props) { @@ -125,57 +123,6 @@ public class MockRaftActorContext implements RaftActorContext { } - public static class MockReplicatedLog implements ReplicatedLog { - private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, ""); - private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, ""); - - @Override public ReplicatedLogEntry get(long index) { - return replicatedLogEntry; - } - - @Override public ReplicatedLogEntry last() { - return last; - } - - @Override public long lastIndex() { - return last.getIndex(); - } - - @Override public long lastTerm() { - return last.getTerm(); - } - - @Override public void removeFrom(long index) { - } - - @Override public void append(ReplicatedLogEntry replicatedLogEntry) { - } - - @Override public void appendAndPersist( - ReplicatedLogEntry replicatedLogEntry) { - } - - @Override public List getFrom(long index) { - return Collections.EMPTY_LIST; - } - - @Override public long size() { - if(replicatedLogEntry != null){ - return 1; - } - return 0; - } - - public void setReplicatedLogEntry( - ReplicatedLogEntry replicatedLogEntry) { - this.replicatedLogEntry = replicatedLogEntry; - } - - public void setLast(ReplicatedLogEntry last) { - this.last = last; - } - } - public static class SimpleReplicatedLog implements ReplicatedLog { private final List log = new ArrayList<>(10000); @@ -210,6 +157,9 @@ public class MockRaftActorContext implements RaftActorContext { } @Override public void removeFrom(long index) { + if(index >= log.size() || index < 0){ + return; + } for(int i=(int) index ; i < log.size() ; i++) { log.remove(i); } @@ -225,6 +175,9 @@ public class MockRaftActorContext implements RaftActorContext { } @Override public List getFrom(long index) { + if(index >= log.size() || index < 0){ + return Collections.EMPTY_LIST; + } List entries = new ArrayList<>(); for(int i=(int) index ; i < log.size() ; i++) { entries.add(get(i)); @@ -235,6 +188,30 @@ public class MockRaftActorContext implements RaftActorContext { @Override public long size() { return log.size(); } + + @Override public boolean isPresent(long index) { + if(index >= log.size() || index < 0){ + return false; + } + + return true; + } + + @Override public boolean isInSnapshot(long index) { + return false; + } + + @Override public Object getSnapshot() { + return null; + } + + @Override public long getSnapshotIndex() { + return -1; + } + + @Override public long getSnapshotTerm() { + return -1; + } } public static class MockReplicatedLogEntry implements ReplicatedLogEntry { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 273342eb47..a7b6825c7d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -211,15 +211,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { RaftActorContext actorContext = createActorContext(behaviorActor); - MockRaftActorContext.MockReplicatedLog - log = new MockRaftActorContext.MockReplicatedLog(); - log.setReplicatedLogEntry( + MockRaftActorContext.SimpleReplicatedLog + log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append( new MockRaftActorContext.MockReplicatedLogEntry(20000, 1000000, "")); - log.setLast( - new MockRaftActorContext.MockReplicatedLogEntry(20000, - 1000000, "") - ); ((MockRaftActorContext) actorContext).setReplicatedLog(log); @@ -307,18 +303,17 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { assertEquals(RaftState.Follower, raftState); } - protected MockRaftActorContext.MockReplicatedLog setLastLogEntry( + protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry( MockRaftActorContext actorContext, long term, long index, Object data) { return setLastLogEntry(actorContext, new MockRaftActorContext.MockReplicatedLogEntry(term, index, data)); } - protected MockRaftActorContext.MockReplicatedLog setLastLogEntry( + protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry( MockRaftActorContext actorContext, ReplicatedLogEntry logEntry) { - MockRaftActorContext.MockReplicatedLog - log = new MockRaftActorContext.MockReplicatedLog(); - // By default MockReplicateLog has last entry set to (1,1,"") - log.setLast(logEntry); + MockRaftActorContext.SimpleReplicatedLog + log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(logEntry); actorContext.setReplicatedLog(log); return log; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index ca0e13db03..677442402c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -190,14 +190,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // Set the last log entry term for the receiver to be greater than // what we will be sending as the prevLogTerm in AppendEntries - MockRaftActorContext.MockReplicatedLog mockReplicatedLog = + MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog = setLastLogEntry(context, 20, 0, ""); - // Also set the entry at index 0 with term 20 which will be greater - // than the prevLogTerm sent by the sender - mockReplicatedLog.setReplicatedLogEntry( - new MockRaftActorContext.MockReplicatedLogEntry(20, 0, "")); - // AppendEntries is now sent with a bigger term // this will set the receivers term to be the same as the sender's term AppendEntries appendEntries = -- 2.36.6