From: Moiz Raja Date: Wed, 16 Jul 2014 16:17:32 +0000 (-0700) Subject: Initial implementation of saving and installing snapshots X-Git-Tag: release/helium~435 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=5924885ac74b5fa0c729004a5b66b30654a55496 Initial implementation of saving and installing snapshots This commit basically sketches out how creating and installing snapshots would fit into the overall design. This has not been tested yet. Change-Id: I0c575049e8dbda6b63e49dab80ff0980546029d3 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 35a2c98bd4..90cc247709 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -65,6 +65,15 @@ public class ExampleActor extends RaftActor { } } + @Override protected Object createSnapshot() { + return state; + } + + @Override protected void applySnapshot(Object snapshot) { + state.clear(); + state.putAll((HashMap) snapshot); + } + @Override public void onReceiveRecover(Object message) { super.onReceiveRecover(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java index 27d083f2b3..a148ed4009 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java @@ -10,11 +10,14 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import org.opendaylight.controller.cluster.example.messages.KeyValue; import java.io.BufferedReader; import java.io.InputStreamReader; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; public class Main { @@ -41,14 +44,43 @@ public class Main { actorSystem.actorOf(ExampleActor.props("example-3", withoutPeer("example-3")), "example-3"); + + List examples = Arrays.asList(example1Actor, example2Actor, example3Actor); + ActorRef clientActor = actorSystem.actorOf(ClientActor.props(example1Actor)); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + System.out.println("Usage :"); + System.out.println("s <1-3> to start a peer"); + System.out.println("k <1-3> to kill a peer"); + while(true) { - System.out.print("Enter Integer (0 to exit):"); + System.out.print("Enter command (0 to exit):"); try { - int i = Integer.parseInt(br.readLine()); + String s = br.readLine(); + String[] split = s.split(" "); + if(split.length > 1) { + String command = split[0]; + String actor = split[1]; + + if ("k".equals(command)) { + int i = Integer.parseInt(actor); + examples.get(i - 1) + .tell(PoisonPill.getInstance(), null); + continue; + } else if ("s".equals(command)) { + int i = Integer.parseInt(actor); + String actorName = "example-" + i; + examples.add(i - 1, + actorSystem.actorOf(ExampleActor.props(actorName, + withoutPeer(actorName)), actorName)); + System.out.println("Created actor : " + actorName); + continue; + } + } + + int i = Integer.parseInt(s); if(i == 0){ System.exit(0); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 826faf7414..f38ef18973 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -14,6 +14,10 @@ import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -21,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; @@ -33,50 +38,48 @@ import java.util.Map; * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper * - * In Search of an Understandable Consensus Algorithm - *

+ * In Search of an Understandable Consensus Algorithm + *

* RaftActor has 3 states and each state has a certain behavior associated * with it. A Raft actor can behave as, *

    - *
  • A Leader
  • - *
  • A Follower (or)
  • - *
  • A Candidate
  • + *
  • A Leader
  • + *
  • A Follower (or)
  • + *
  • A Candidate
  • *
- * - *

+ *

+ *

* A RaftActor MUST be a Leader in order to accept requests from clients to * change the state of it's encapsulated state machine. Once a RaftActor becomes * a Leader it is also responsible for ensuring that all followers ultimately * have the same log and therefore the same state machine as itself. - * - *

+ *

+ *

* The current behavior of a RaftActor determines how election for leadership * is initiated and how peer RaftActors react to request for votes. - * - *

+ *

+ *

* Each RaftActor also needs to know the current election term. It uses this * information for a couple of things. One is to simply figure out who it * voted for in the last election. Another is to figure out if the message * it received to update it's state is stale. - * - *

+ *

+ *

* The RaftActor uses akka-persistence to store it's replicated log. * Furthermore through it's behaviors a Raft Actor determines - * + *

*

    *
  • when a log entry should be persisted
  • *
  • when a log entry should be applied to the state machine (and)
  • *
  • when a snapshot should be saved
  • *
- * - * UntypeEventSourceProcessor */ public abstract class RaftActor extends UntypedPersistentActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); /** - * The current state determines the current behavior of a RaftActor + * The current state determines the current behavior of a RaftActor * A Raft Actor always starts off in the Follower State */ private RaftActorBehavior currentBehavior; @@ -93,36 +96,61 @@ public abstract class RaftActor extends UntypedPersistentActor { private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); - - public RaftActor(String id, Map peerAddresses){ + public RaftActor(String id, Map peerAddresses) { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(getSelf().path().toString()), -1, -1, replicatedLog, peerAddresses, LOG); - currentBehavior = switchBehavior(RaftState.Follower); } @Override public void onReceiveRecover(Object message) { - if(message instanceof ReplicatedLogEntry) { + if (message instanceof SnapshotOffer) { + SnapshotOffer offer = (SnapshotOffer) message; + Snapshot snapshot = (Snapshot) offer.snapshot(); + + // Create a replicated log with the snapshot information + // The replicated log can be used later on to retrieve this snapshot + // when we need to install it on a peer + replicatedLog = new ReplicatedLogImpl(snapshot); + + // Apply the snapshot to the actors state + applySnapshot(snapshot.getState()); + + } else if (message instanceof ReplicatedLogEntry) { replicatedLog.append((ReplicatedLogEntry) message); - } else if(message instanceof RecoveryCompleted){ - LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex()); + } else if (message instanceof RecoveryCompleted) { + LOG.debug( + "Last index in log : " + replicatedLog.lastIndex()); + currentBehavior = switchBehavior(RaftState.Follower); } } @Override public void onReceiveCommand(Object message) { - if(message instanceof ApplyState){ + if (message instanceof ApplyState) { - ApplyState applyState = (ApplyState) message; + ApplyState applyState = (ApplyState) message; - LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex()); + LOG.debug("Applying state for log index {}", + applyState.getReplicatedLogEntry().getIndex()); applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); - } else if(message instanceof FindLeader){ - getSender().tell(new FindLeaderReply( - context.getPeerAddress(currentBehavior.getLeaderId())), - getSelf()); + } else if(message instanceof ApplySnapshot ) { + applySnapshot(((ApplySnapshot) message).getSnapshot()); + } else if (message instanceof FindLeader) { + getSender().tell( + new FindLeaderReply( + context.getPeerAddress(currentBehavior.getLeaderId())), + getSelf() + ); + } else if (message instanceof SaveSnapshotSuccess) { + SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; + + // TODO: Not sure if we want to be this aggressive with trimming stuff + trimPersistentData(success.metadata().sequenceNr()); + + } else if (message instanceof SaveSnapshotFailure) { + // TODO: Handle failure in saving the snapshot } else { RaftState state = currentBehavior.handleMessage(getSender(), message); @@ -130,33 +158,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } } - private RaftActorBehavior switchBehavior(RaftState state){ - if(currentBehavior != null) { - if (currentBehavior.state() == state) { - return currentBehavior; - } - LOG.info("Switching from state " + currentBehavior.state() + " to " - + state); - - try { - currentBehavior.close(); - } catch (Exception e) { - LOG.error(e, "Failed to close behavior : " + currentBehavior.state()); - } - } else { - LOG.info("Switching behavior to " + state); - } - RaftActorBehavior behavior = null; - if(state == RaftState.Candidate){ - behavior = new Candidate(context); - } else if(state == RaftState.Follower){ - behavior = new Follower(context); - } else { - behavior = new Leader(context); - } - return behavior; - } /** * When a derived RaftActor needs to persist something it must call @@ -166,55 +168,170 @@ public abstract class RaftActor extends UntypedPersistentActor { * @param identifier * @param data */ - protected void persistData(ActorRef clientActor, String identifier, Object data){ + protected void persistData(ActorRef clientActor, String identifier, + Object data) { LOG.debug("Persist data " + identifier); ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); - replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry); + replicatedLog + .appendAndPersist(clientActor, identifier, replicatedLogEntry); } - protected abstract void applyState(ActorRef clientActor, String identifier, Object data); - - protected String getId(){ + protected String getId() { return context.getId(); } - protected boolean isLeader(){ + /** + * Derived actors can call the isLeader method to check if the current + * RaftActor is the Leader or not + * + * @return true it this RaftActor is a Leader false otherwise + */ + protected boolean isLeader() { return context.getId().equals(currentBehavior.getLeaderId()); } - protected ActorSelection getLeader(){ + /** + * Derived actor can call getLeader if they need a reference to the Leader. + * This would be useful for example in forwarding a request to an actor + * which is the leader + * + * @return A reference to the leader if known, null otherwise + */ + protected ActorSelection getLeader() { String leaderId = currentBehavior.getLeaderId(); + if (leaderId == null) { + return null; + } String peerAddress = context.getPeerAddress(leaderId); - LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress); + LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + + peerAddress); return context.actorSelection(peerAddress); } + /** + * The applyState method will be called by the RaftActor when some data + * needs to be applied to the actor's state + * + * @param clientActor A reference to the client who sent this message. This + * is the same reference that was passed to persistData + * by the derived actor. clientActor may be null when + * the RaftActor is behaving as a follower or during + * recovery. + * @param identifier The identifier of the persisted data. This is also + * the same identifier that was passed to persistData by + * the derived actor. identifier may be null when + * the RaftActor is behaving as a follower or during + * recovery + * @param data A piece of data that was persisted by the persistData call. + * This should NEVER be null. + */ + protected abstract void applyState(ActorRef clientActor, String identifier, + Object data); + + /** + * This method will be called by the RaftActor when a snapshot needs to be + * created. The derived actor should respond with its current state. + *

+ * During recovery the state that is returned by the derived actor will + * be passed back to it by calling the applySnapshot method + * + * @return The current state of the actor + */ + protected abstract Object createSnapshot(); + + /** + * This method will be called by the RaftActor during recovery to + * reconstruct the state of the actor. + *

+ * This method may also be called at any other point during normal + * operations when the derived actor is out of sync with it's peers + * and the only way to bring it in sync is by applying a snapshot + * + * @param snapshot A snapshot of the state of the actor + */ + protected abstract void applySnapshot(Object snapshot); + + private RaftActorBehavior switchBehavior(RaftState state) { + if (currentBehavior != null) { + if (currentBehavior.state() == state) { + return currentBehavior; + } + LOG.info("Switching from state " + currentBehavior.state() + " to " + + state); + + try { + currentBehavior.close(); + } catch (Exception e) { + LOG.error(e, + "Failed to close behavior : " + currentBehavior.state()); + } + + } else { + LOG.info("Switching behavior to " + state); + } + RaftActorBehavior behavior = null; + if (state == RaftState.Candidate) { + behavior = new Candidate(context); + } else if (state == RaftState.Follower) { + behavior = new Follower(context); + } else { + behavior = new Leader(context); + } + return behavior; + } + + private void trimPersistentData(long sequenceNumber) { + // Trim snapshots + // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied + // For now guessing that it is ANDed. + deleteSnapshots(new SnapshotSelectionCriteria( + sequenceNumber - 100000, 43200000)); + + // Trim journal + deleteMessages(sequenceNumber); + } + private class ReplicatedLogImpl implements ReplicatedLog { - private final List journal = new ArrayList(); - private long snapshotIndex = 0; - private Object snapShot = null; + private final List journal; + private final Object snapshot; + private long snapshotIndex = -1; + private long snapshotTerm = -1; + + public ReplicatedLogImpl(Snapshot snapshot) { + this.snapshot = snapshot.getState(); + this.snapshotIndex = snapshot.getLastAppliedIndex(); + this.snapshotTerm = snapshot.getLastAppliedTerm(); + + this.journal = new ArrayList<>(snapshot.getUnAppliedEntries()); + } + public ReplicatedLogImpl() { + this.snapshot = null; + this.journal = new ArrayList<>(); + } @Override public ReplicatedLogEntry get(long index) { - if(index < 0 || index >= journal.size()){ + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { return null; } - return journal.get((int) (index - snapshotIndex)); + return journal.get(adjustedIndex); } @Override public ReplicatedLogEntry last() { - if(journal.size() == 0){ + if (journal.size() == 0) { return null; } return get(journal.size() - 1); } @Override public long lastIndex() { - if(journal.size() == 0){ + if (journal.size() == 0) { return -1; } @@ -222,7 +339,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } @Override public long lastTerm() { - if(journal.size() == 0){ + if (journal.size() == 0) { return -1; } @@ -231,56 +348,124 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void removeFrom(long index) { - if(index < 0 || index >= journal.size()){ + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { return; } - for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){ + for (int i = adjustedIndex; + i < journal.size(); i++) { deleteMessage(i); journal.remove(i); } } - @Override public void append(final ReplicatedLogEntry replicatedLogEntry) { + @Override public void append( + final ReplicatedLogEntry replicatedLogEntry) { journal.add(replicatedLogEntry); } @Override public List getFrom(long index) { + int adjustedIndex = adjustedIndex(index); + List entries = new ArrayList<>(100); - if(index < 0 || index >= journal.size()){ + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { return entries; } - for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){ + for (int i = adjustedIndex; + i < journal.size(); i++) { entries.add(journal.get(i)); } return entries; } - @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){ + @Override public void appendAndPersist( + final ReplicatedLogEntry replicatedLogEntry) { appendAndPersist(null, null, replicatedLogEntry); } - public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){ - context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex()); + public void appendAndPersist(final ActorRef clientActor, + final String identifier, + final ReplicatedLogEntry replicatedLogEntry) { + context.getLogger().debug( + "Append log entry and persist " + replicatedLogEntry.getIndex()); // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs journal.add(replicatedLogEntry); + + // When persisting events with persist it is guaranteed that the + // persistent actor will not receive further commands between the + // persist call and the execution(s) of the associated event + // handler. This also holds for multiple persist calls in context + // of a single command. persist(replicatedLogEntry, new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { + // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. + if (size() > 100000) { + ReplicatedLogEntry lastAppliedEntry = + get(context.getLastApplied()); + long lastAppliedIndex = -1; + long lastAppliedTerm = -1; + if (lastAppliedEntry != null) { + lastAppliedIndex = lastAppliedEntry.getIndex(); + lastAppliedTerm = lastAppliedEntry.getTerm(); + } + + saveSnapshot(Snapshot.create(createSnapshot(), + getFrom(context.getLastApplied() + 1), + lastIndex(), lastTerm(), lastAppliedIndex, + lastAppliedTerm)); + } // Send message for replication - if(clientActor != null) { + if (clientActor != null) { currentBehavior.handleMessage(getSelf(), new Replicate(clientActor, identifier, - replicatedLogEntry)); + replicatedLogEntry) + ); } } - }); + } + ); } @Override public long size() { return journal.size() + snapshotIndex; } + + @Override public boolean isPresent(long index) { + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + return false; + } + return true; + } + + @Override public boolean isInSnapshot(long index) { + return index <= snapshotIndex; + } + + @Override public Object getSnapshot() { + return snapshot; + } + + @Override public long getSnapshotIndex() { + return snapshotIndex; + } + + @Override public long getSnapshotTerm() { + return snapshotTerm; + } + + private int adjustedIndex(long index) { + if(snapshotIndex < 0){ + return (int) index; + } + return (int) (index - snapshotIndex); + } } + private static class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable { @@ -288,7 +473,7 @@ public abstract class RaftActor extends UntypedPersistentActor { private final long term; private final Object payload; - public ReplicatedLogImplEntry(long index, long term, Object payload){ + public ReplicatedLogImplEntry(long index, long term, Object payload) { this.index = index; this.term = term; @@ -309,4 +494,53 @@ public abstract class RaftActor extends UntypedPersistentActor { } + private static class Snapshot implements Serializable { + private final Object state; + private final List unAppliedEntries; + private final long lastIndex; + private final long lastTerm; + private final long lastAppliedIndex; + private final long lastAppliedTerm; + + private Snapshot(Object state, + List unAppliedEntries, long lastIndex, + long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { + this.state = state; + this.unAppliedEntries = unAppliedEntries; + this.lastIndex = lastIndex; + this.lastTerm = lastTerm; + this.lastAppliedIndex = lastAppliedIndex; + this.lastAppliedTerm = lastAppliedTerm; + } + + + public static Snapshot create(Object state, + List entries, long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm) { + return new Snapshot(state, entries, lastIndex, lastTerm, + lastAppliedIndex, lastAppliedTerm); + } + + public Object getState() { + return state; + } + + public List getUnAppliedEntries() { + return unAppliedEntries; + } + + public long getLastTerm() { + return lastTerm; + } + + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 34e7ac9768..3de0de5131 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -17,8 +17,9 @@ public interface ReplicatedLog { /** * Get a replicated log entry at the specified index * - * @param index - * @return + * @param index the index of the log entry + * @return the ReplicatedLogEntry at index. null if index less than 0 or + * greater than the size of the in-memory journal. */ ReplicatedLogEntry get(long index); @@ -45,7 +46,7 @@ public interface ReplicatedLog { /** * Remove all the entries from the logs >= index * - * @param index + * @param index the index of the log entry */ void removeFrom(long index); @@ -63,7 +64,7 @@ public interface ReplicatedLog { /** * - * @param index + * @param index the index of the log entry */ List getFrom(long index); @@ -73,4 +74,43 @@ public interface ReplicatedLog { * @return */ long size(); + + /** + * Checks if the entry at the specified index is present or not + * + * @param index the index of the log entry + * @return true if the entry is present in the in-memory journal + */ + boolean isPresent(long index); + + /** + * Checks if the entry is present in a snapshot + * + * @param index the index of the log entry + * @return true if the entry is in the snapshot. false if the entry is not + * in the snapshot even if the entry may be present in the replicated log + */ + boolean isInSnapshot(long index); + + /** + * Get the snapshot + * + * @return an object representing the snapshot if it exists. null otherwise + */ + Object getSnapshot(); + + /** + * Get the index of the snapshot + * + * @return the index from which the snapshot was created. -1 otherwise. + */ + long getSnapshotIndex(); + + /** + * Get the term of the snapshot + * + * @return the term of the index from which the snapshot was created. -1 + * otherwise + */ + long getSnapshotTerm(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 88558cac16..1cfc2e0eb9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -12,9 +12,11 @@ import akka.actor.ActorRef; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; /** @@ -165,6 +167,9 @@ public class Follower extends AbstractRaftActorBehavior { @Override public RaftState handleMessage(ActorRef sender, Object message) { if(message instanceof ElectionTimeout){ return RaftState.Candidate; + } else if(message instanceof InstallSnapshot){ + InstallSnapshot snapshot = (InstallSnapshot) message; + actor().tell(new ApplySnapshot(snapshot), actor()); } scheduleElection(electionDuration()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index ec4551ac4f..90edf7da9a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -22,12 +22,16 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,7 +68,9 @@ public class Leader extends AbstractRaftActorBehavior { private final Map followerToActor = new HashMap<>(); - private Cancellable heartbeatCancel = null; + private Cancellable heartbeatSchedule = null; + private Cancellable appendEntriesSchedule = null; + private Cancellable installSnapshotSchedule = null; private List trackerList = new ArrayList<>(); @@ -103,6 +109,10 @@ public class Leader extends AbstractRaftActorBehavior { // prevent election timeouts (§5.2) scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + scheduleInstallSnapshotCheck( + new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, + HEART_BEAT_INTERVAL.unit()) + ); } @@ -194,8 +204,14 @@ public class Leader extends AbstractRaftActorBehavior { try { if (message instanceof SendHeartBeat) { return sendHeartBeat(); + } else if(message instanceof SendInstallSnapshot) { + installSnapshotIfNeeded(); } else if (message instanceof Replicate) { replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply){ + // FIXME : Should I be checking the term here too? + handleInstallSnapshotReply( + (InstallSnapshotReply) message); } } finally { scheduleHeartBeat(HEART_BEAT_INTERVAL); @@ -204,6 +220,18 @@ public class Leader extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } + private void handleInstallSnapshotReply(InstallSnapshotReply message) { + InstallSnapshotReply reply = message; + String followerId = reply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + followerLogInformation + .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation + .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } + private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); @@ -244,8 +272,12 @@ public class Leader extends AbstractRaftActorBehavior { long nextIndex = followerLogInformation.getNextIndex().get(); - List entries = - context.getReplicatedLog().getFrom(nextIndex); + List entries = Collections.emptyList(); + + if(context.getReplicatedLog().isPresent(nextIndex)){ + entries = + context.getReplicatedLog().getFrom(nextIndex); + } followerActor.tell( new AppendEntries(currentTerm(), context.getId(), @@ -257,6 +289,29 @@ public class Leader extends AbstractRaftActorBehavior { } } + private void installSnapshotIfNeeded(){ + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){ + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().getSnapshot() + ), + actor() + ); + } + } + } + private RaftState sendHeartBeat() { if (followerToActor.size() > 0) { sendAppendEntries(); @@ -265,8 +320,14 @@ public class Leader extends AbstractRaftActorBehavior { } private void stopHeartBeat() { - if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) { - heartbeatCancel.cancel(); + if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) { + heartbeatSchedule.cancel(); + } + } + + private void stopInstallSnapshotSchedule() { + if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { + installSnapshotSchedule.cancel(); } } @@ -284,13 +345,34 @@ public class Leader extends AbstractRaftActorBehavior { // Scheduling the heartbeat only once here because heartbeats do not // need to be sent if there are other messages being sent to the remote // actor. - heartbeatCancel = + heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( interval, context.getActor(), new SendHeartBeat(), context.getActorSystem().dispatcher(), context.getActor()); } + + private void scheduleInstallSnapshotCheck(FiniteDuration interval) { + if(followerToActor.keySet().size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } + + stopInstallSnapshotSchedule(); + + // Schedule a message to send append entries to followers that can + // accept an append entries with some data in it + installSnapshotSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, + context.getActor(), new SendInstallSnapshot(), + context.getActorSystem().dispatcher(), context.getActor()); + } + + + @Override public void close() throws Exception { stopHeartBeat(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java new file mode 100644 index 0000000000..a7172e22c8 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +public class ApplySnapshot { + private final Object snapshot; + + public ApplySnapshot(Object snapshot) { + this.snapshot = snapshot; + } + + public Object getSnapshot() { + return snapshot; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java new file mode 100644 index 0000000000..0c370aa981 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +public class SendInstallSnapshot { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java new file mode 100644 index 0000000000..888854fa71 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public class InstallSnapshot extends AbstractRaftRPC { + + private final String leaderId; + private final long lastIncludedIndex; + private final long lastIncludedTerm; + private final Object data; + + public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) { + super(term); + this.leaderId = leaderId; + this.lastIncludedIndex = lastIncludedIndex; + this.lastIncludedTerm = lastIncludedTerm; + this.data = data; + } + + public String getLeaderId() { + return leaderId; + } + + public long getLastIncludedIndex() { + return lastIncludedIndex; + } + + public long getLastIncludedTerm() { + return lastIncludedTerm; + } + + public Object getData() { + return data; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java new file mode 100644 index 0000000000..85b89b70ae --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public class InstallSnapshotReply extends AbstractRaftRPC { + + // The followerId - this will be used to figure out which follower is + // responding + private final String followerId; + + protected InstallSnapshotReply(long term, String followerId) { + super(term); + this.followerId = followerId; + } + + public String getFollowerId() { + return followerId; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index addf51a63c..b9c1278b8e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -50,10 +50,8 @@ public class MockRaftActorContext implements RaftActorContext { public void initReplicatedLog(){ - MockReplicatedLog mockReplicatedLog = new MockReplicatedLog(); - this.replicatedLog = mockReplicatedLog; - mockReplicatedLog.setLast(new MockReplicatedLogEntry(1,1,"")); - mockReplicatedLog.setReplicatedLogEntry(new MockReplicatedLogEntry(1,1, "")); + this.replicatedLog = new SimpleReplicatedLog(); + this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, "")); } @Override public ActorRef actorOf(Props props) { @@ -125,57 +123,6 @@ public class MockRaftActorContext implements RaftActorContext { } - public static class MockReplicatedLog implements ReplicatedLog { - private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, ""); - private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, ""); - - @Override public ReplicatedLogEntry get(long index) { - return replicatedLogEntry; - } - - @Override public ReplicatedLogEntry last() { - return last; - } - - @Override public long lastIndex() { - return last.getIndex(); - } - - @Override public long lastTerm() { - return last.getTerm(); - } - - @Override public void removeFrom(long index) { - } - - @Override public void append(ReplicatedLogEntry replicatedLogEntry) { - } - - @Override public void appendAndPersist( - ReplicatedLogEntry replicatedLogEntry) { - } - - @Override public List getFrom(long index) { - return Collections.EMPTY_LIST; - } - - @Override public long size() { - if(replicatedLogEntry != null){ - return 1; - } - return 0; - } - - public void setReplicatedLogEntry( - ReplicatedLogEntry replicatedLogEntry) { - this.replicatedLogEntry = replicatedLogEntry; - } - - public void setLast(ReplicatedLogEntry last) { - this.last = last; - } - } - public static class SimpleReplicatedLog implements ReplicatedLog { private final List log = new ArrayList<>(10000); @@ -210,6 +157,9 @@ public class MockRaftActorContext implements RaftActorContext { } @Override public void removeFrom(long index) { + if(index >= log.size() || index < 0){ + return; + } for(int i=(int) index ; i < log.size() ; i++) { log.remove(i); } @@ -225,6 +175,9 @@ public class MockRaftActorContext implements RaftActorContext { } @Override public List getFrom(long index) { + if(index >= log.size() || index < 0){ + return Collections.EMPTY_LIST; + } List entries = new ArrayList<>(); for(int i=(int) index ; i < log.size() ; i++) { entries.add(get(i)); @@ -235,6 +188,30 @@ public class MockRaftActorContext implements RaftActorContext { @Override public long size() { return log.size(); } + + @Override public boolean isPresent(long index) { + if(index >= log.size() || index < 0){ + return false; + } + + return true; + } + + @Override public boolean isInSnapshot(long index) { + return false; + } + + @Override public Object getSnapshot() { + return null; + } + + @Override public long getSnapshotIndex() { + return -1; + } + + @Override public long getSnapshotTerm() { + return -1; + } } public static class MockReplicatedLogEntry implements ReplicatedLogEntry { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 273342eb47..a7b6825c7d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -211,15 +211,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { RaftActorContext actorContext = createActorContext(behaviorActor); - MockRaftActorContext.MockReplicatedLog - log = new MockRaftActorContext.MockReplicatedLog(); - log.setReplicatedLogEntry( + MockRaftActorContext.SimpleReplicatedLog + log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append( new MockRaftActorContext.MockReplicatedLogEntry(20000, 1000000, "")); - log.setLast( - new MockRaftActorContext.MockReplicatedLogEntry(20000, - 1000000, "") - ); ((MockRaftActorContext) actorContext).setReplicatedLog(log); @@ -307,18 +303,17 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { assertEquals(RaftState.Follower, raftState); } - protected MockRaftActorContext.MockReplicatedLog setLastLogEntry( + protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry( MockRaftActorContext actorContext, long term, long index, Object data) { return setLastLogEntry(actorContext, new MockRaftActorContext.MockReplicatedLogEntry(term, index, data)); } - protected MockRaftActorContext.MockReplicatedLog setLastLogEntry( + protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry( MockRaftActorContext actorContext, ReplicatedLogEntry logEntry) { - MockRaftActorContext.MockReplicatedLog - log = new MockRaftActorContext.MockReplicatedLog(); - // By default MockReplicateLog has last entry set to (1,1,"") - log.setLast(logEntry); + MockRaftActorContext.SimpleReplicatedLog + log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(logEntry); actorContext.setReplicatedLog(log); return log; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index ca0e13db03..677442402c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -190,14 +190,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // Set the last log entry term for the receiver to be greater than // what we will be sending as the prevLogTerm in AppendEntries - MockRaftActorContext.MockReplicatedLog mockReplicatedLog = + MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog = setLastLogEntry(context, 20, 0, ""); - // Also set the entry at index 0 with term 20 which will be greater - // than the prevLogTerm sent by the sender - mockReplicatedLog.setReplicatedLogEntry( - new MockRaftActorContext.MockReplicatedLogEntry(20, 0, "")); - // AppendEntries is now sent with a bigger term // this will set the receivers term to be the same as the sender's term AppendEntries appendEntries =