From 43fbc0b14b21dc32ed8a14128453dd1581920f5a Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Wed, 6 Aug 2014 16:25:41 -0700 Subject: [PATCH] Install snapshot and Reply 1. InstallSnaphot change in Leader 2. Moved Snapshot out of RaftActor 3. Follower ApplySnapshot changes 4. InstallSnapshotReply changes in Leader 5. Installing Snapshot made in chunks 6. Implemented ProtocolBuffer for InstallSnapshots 7. Sanpshots as ByteStrings 8. Create snapshots made aysnc using CaptureSnapshot messages 9. Save Snapshot Failure Rollback 10. LeaderTest for send snapshot and snaphot reply Change-Id: I16acc20ac7f671e89c5fbdb7aebd6dbe72e5f6e0 Signed-off-by: Kamal Rameshan --- .../cluster/example/ExampleActor.java | 66 +- .../example/ExampleConfigParamsImpl.java | 5 + .../cluster/example/TestDriver.java | 2 + .../raft/AbstractReplicatedLogImpl.java | 82 +- .../controller/cluster/raft/ConfigParams.java | 5 + .../cluster/raft/DefaultConfigParamsImpl.java | 7 + .../controller/cluster/raft/RaftActor.java | 184 +-- .../cluster/raft/ReplicatedLog.java | 49 +- .../cluster/raft/SerializationUtils.java | 4 + .../controller/cluster/raft/Snapshot.java | 76 ++ .../raft/base/messages/ApplySnapshot.java | 11 +- .../raft/base/messages/CaptureSnapshot.java | 40 + .../base/messages/CaptureSnapshotReply.java | 26 + .../behaviors/AbstractRaftActorBehavior.java | 11 +- .../cluster/raft/behaviors/Follower.java | 61 +- .../cluster/raft/behaviors/Leader.java | 264 ++++- .../raft/messages/InstallSnapshot.java | 47 +- .../raft/messages/InstallSnapshotReply.java | 15 +- .../messages/InstallSnapshotMessages.java | 1015 +++++++++++++++++ .../src/main/resources/InstallSnapshot.proto | 15 + .../cluster/raft/MockRaftActorContext.java | 119 +- .../cluster/raft/RaftActorTest.java | 5 +- .../cluster/raft/behaviors/FollowerTest.java | 7 +- .../cluster/raft/behaviors/LeaderTest.java | 376 +++++- .../controller/cluster/datastore/Shard.java | 7 +- 25 files changed, 2228 insertions(+), 271 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index cbd7ca2d70..c4ff108611 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -12,14 +12,21 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; import com.google.common.base.Optional; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; @@ -82,14 +89,63 @@ public class ExampleActor extends RaftActor { } } - @Override protected Object createSnapshot() { - return state; + @Override protected void createSnapshot() { + ByteString bs = null; + try { + bs = fromObject(state); + } catch (Exception e) { + LOG.error("Exception in creating snapshot", e); + } + getSelf().tell(new CaptureSnapshotReply(bs), null); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { state.clear(); - state.putAll((HashMap) snapshot); - LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size()); + try { + state.putAll((HashMap) toObject(snapshot)); + } catch (Exception e) { + LOG.error("Exception in applying snapshot", e); + } + LOG.debug("Snapshot applied to state :" + ((HashMap) state).size()); + } + + private ByteString fromObject(Object snapshot) throws Exception { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(snapshot); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } + + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + Object obj = null; + ByteArrayInputStream bis = null; + ObjectInputStream ois = null; + try { + bis = new ByteArrayInputStream(bs.toByteArray()); + ois = new ObjectInputStream(bis); + obj = ois.readObject(); + } finally { + if (bis != null) { + bis.close(); + } + if (ois != null) { + ois.close(); + } + } + return obj; } @Override protected void onStateChanged() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java index d11377dbcb..6192cad230 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java @@ -17,4 +17,9 @@ public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl { public long getSnapshotBatchCount() { return 50; } + + @Override + public int getSnapshotChunkSize() { + return 50; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java index fd6e192bf0..978ea91089 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -109,6 +109,8 @@ public class TestDriver { td.printState(); } else if (command.startsWith("printNodes")) { td.printNodes(); + } else { + System.out.println("Invalid command:" + command); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index b5b034afb9..b436bce500 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.protobuf.ByteString; + import java.util.ArrayList; import java.util.List; @@ -16,12 +18,18 @@ import java.util.List; */ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { - protected final List journal; - protected final Object snapshot; + protected List journal; + protected ByteString snapshot; protected long snapshotIndex = -1; protected long snapshotTerm = -1; - public AbstractReplicatedLogImpl(Object state, long snapshotIndex, + // to be used for rollback during save snapshot failure + protected List snapshottedJournal; + protected ByteString previousSnapshot; + protected long previousSnapshotIndex = -1; + protected long previousSnapshotTerm = -1; + + public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex, long snapshotTerm, List unAppliedEntries) { this.snapshot = state; this.snapshotIndex = snapshotIndex; @@ -137,11 +145,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public boolean isInSnapshot(long logEntryIndex) { - return logEntryIndex <= snapshotIndex; + return logEntryIndex <= snapshotIndex && snapshotIndex != -1; } @Override - public Object getSnapshot() { + public ByteString getSnapshot() { return snapshot; } @@ -160,4 +168,68 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public abstract void removeFromAndPersist(long index); + + @Override + public void setSnapshotIndex(long snapshotIndex) { + this.snapshotIndex = snapshotIndex; + } + + @Override + public void setSnapshotTerm(long snapshotTerm) { + this.snapshotTerm = snapshotTerm; + } + + @Override + public void setSnapshot(ByteString snapshot) { + this.snapshot = snapshot; + } + + @Override + public void clear(int startIndex, int endIndex) { + journal.subList(startIndex, endIndex).clear(); + } + + @Override + public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) { + snapshottedJournal = new ArrayList<>(journal.size()); + + snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex))); + clear(0, (int) (snapshotCapturedIndex - snapshotIndex)); + + previousSnapshotIndex = snapshotIndex; + setSnapshotIndex(snapshotCapturedIndex); + + previousSnapshotTerm = snapshotTerm; + setSnapshotTerm(snapshotCapturedTerm); + + previousSnapshot = getSnapshot(); + setSnapshot(snapshot); + } + + @Override + public void snapshotCommit() { + snapshottedJournal.clear(); + snapshottedJournal = null; + previousSnapshotIndex = -1; + previousSnapshotTerm = -1; + previousSnapshot = null; + } + + @Override + public void snapshotRollback() { + snapshottedJournal.addAll(journal); + journal.clear(); + journal = snapshottedJournal; + snapshottedJournal = null; + + snapshotIndex = previousSnapshotIndex; + previousSnapshotIndex = -1; + + snapshotTerm = previousSnapshotTerm; + previousSnapshotTerm = -1; + + snapshot = previousSnapshot; + previousSnapshot = null; + + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java index 4c6434aec4..ed6439d8c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -52,4 +52,9 @@ public interface ConfigParams { * @return int */ public int getElectionTimeVariance(); + + /** + * The size (in bytes) of the snapshot chunk sent from Leader + */ + public int getSnapshotChunkSize(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 6432fa4811..75c237f503 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -25,6 +25,8 @@ public class DefaultConfigParamsImpl implements ConfigParams { */ private static final int ELECTION_TIME_MAX_VARIANCE = 100; + private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB + /** * The interval at which a heart beat message will be sent to the remote @@ -58,4 +60,9 @@ public class DefaultConfigParamsImpl implements ConfigParams { public int getElectionTimeVariance() { return ELECTION_TIME_MAX_VARIANCE; } + + @Override + public int getSnapshotChunkSize() { + return SNAPSHOT_CHUNK_SIZE; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 988789b401..296ce2d24a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import com.google.common.base.Optional; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import java.io.Serializable; -import java.util.List; import java.util.Map; /** @@ -98,6 +103,9 @@ public abstract class RaftActor extends UntypedPersistentActor { */ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); + private CaptureSnapshot captureSnapshot = null; + + private volatile boolean hasSnapshotCaptureInitiated = false; public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); @@ -125,6 +133,7 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog = new ReplicatedLogImpl(snapshot); context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); LOG.debug("Applied snapshot to replicatedLog. " + "snapshotIndex={}, snapshotTerm={}, journal-size={}", @@ -132,7 +141,7 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog.size()); // Apply the snapshot to the actors state - applySnapshot(snapshot.getState()); + applySnapshot(ByteString.copyFrom(snapshot.getState())); } else if (message instanceof ReplicatedLogEntry) { replicatedLog.append((ReplicatedLogEntry) message); @@ -164,7 +173,17 @@ public abstract class RaftActor extends UntypedPersistentActor { applyState.getReplicatedLogEntry().getData()); } else if(message instanceof ApplySnapshot ) { - applySnapshot(((ApplySnapshot) message).getSnapshot()); + Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); + + LOG.debug("ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + snapshot.getLastAppliedTerm()); + applySnapshot(ByteString.copyFrom(snapshot.getState())); + + //clears the followers log, sets the snapshot index to ensure adjusted-index works + replicatedLog = new ReplicatedLogImpl(snapshot); + context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); } else if (message instanceof FindLeader) { getSender().tell( @@ -174,13 +193,26 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof SaveSnapshotSuccess) { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; + LOG.info("SaveSnapshotSuccess received for snapshot"); + + context.getReplicatedLog().snapshotCommit(); // TODO: Not sure if we want to be this aggressive with trimming stuff trimPersistentData(success.metadata().sequenceNr()); } else if (message instanceof SaveSnapshotFailure) { + SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; + + LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString()); + LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:"); - // TODO: Handle failure in saving the snapshot + context.getReplicatedLog().snapshotRollback(); + + LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().size()); } else if (message instanceof AddRaftPeer){ @@ -196,7 +228,25 @@ public abstract class RaftActor extends UntypedPersistentActor { RemoveRaftPeer rrp = (RemoveRaftPeer)message; context.removePeer(rrp.getName()); + } else if (message instanceof CaptureSnapshot) { + LOG.debug("CaptureSnapshot received by actor"); + CaptureSnapshot cs = (CaptureSnapshot)message; + captureSnapshot = cs; + createSnapshot(); + + } else if (message instanceof CaptureSnapshotReply){ + LOG.debug("CaptureSnapshotReply received by actor"); + CaptureSnapshotReply csr = (CaptureSnapshotReply) message; + + ByteString stateInBytes = csr.getSnapshot(); + LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + handleCaptureSnapshotReply(stateInBytes); + } else { + if (!(message instanceof AppendEntriesMessages.AppendEntries) + && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { + LOG.debug("onReceiveCommand: message:" + message.getClass()); + } RaftState state = currentBehavior.handleMessage(getSender(), message); @@ -344,7 +394,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @return The current state of the actor */ - protected abstract Object createSnapshot(); + protected abstract void createSnapshot(); /** * This method will be called by the RaftActor during recovery to @@ -356,7 +406,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @param snapshot A snapshot of the state of the actor */ - protected abstract void applySnapshot(Object snapshot); + protected abstract void applySnapshot(ByteString snapshot); /** * This method will be called by the RaftActor when the state of the @@ -423,11 +473,39 @@ public abstract class RaftActor extends UntypedPersistentActor { return peerAddress; } + private void handleCaptureSnapshotReply(ByteString stateInBytes) { + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + + Snapshot sn = Snapshot.create(stateInBytes.toByteArray(), + context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), + captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + + saveSnapshot(sn); + + LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + + //be greedy and remove entries from in-mem journal which are in the snapshot + // and update snapshotIndex and snapshotTerm without waiting for the success, + + context.getReplicatedLog().snapshotPreCommit(stateInBytes, + captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + captureSnapshot = null; + hasSnapshotCaptureInitiated = false; + } + private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - super(snapshot.getState(), + super(ByteString.copyFrom(snapshot.getState()), snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), snapshot.getUnAppliedEntries()); } @@ -476,8 +554,10 @@ public abstract class RaftActor extends UntypedPersistentActor { persist(replicatedLogEntry, new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { - // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. - if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) { + // when a snaphsot is being taken, captureSnapshot != null + if (hasSnapshotCaptureInitiated == false && + journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) { + LOG.info("Initiating Snapshot Capture.."); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -493,26 +573,11 @@ public abstract class RaftActor extends UntypedPersistentActor { LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); - // create a snapshot object from the state provided and save it - // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot sn = Snapshot.create(createSnapshot(), - getFrom(context.getLastApplied() + 1), - lastIndex(), lastTerm(), lastAppliedIndex, - lastAppliedTerm); - saveSnapshot(sn); - - LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); - - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, - // TODO: damage-recovery to be done on failure - journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear(); - snapshotIndex = lastAppliedIndex; - snapshotTerm = lastAppliedTerm; - - LOG.info("Removed in-memory snapshotted entries, " + - "adjusted snaphsotIndex:{}" + - "and term:{}", snapshotIndex, lastAppliedTerm); + // send a CaptureSnapshot to self to make the expensive operation async. + getSelf().tell(new CaptureSnapshot( + lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + null); + hasSnapshotCaptureInitiated = true; } // Send message for replication if (clientActor != null) { @@ -542,65 +607,6 @@ public abstract class RaftActor extends UntypedPersistentActor { } - private static class Snapshot implements Serializable { - private final Object state; - private final List unAppliedEntries; - private final long lastIndex; - private final long lastTerm; - private final long lastAppliedIndex; - private final long lastAppliedTerm; - - private Snapshot(Object state, - List unAppliedEntries, long lastIndex, - long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { - this.state = state; - this.unAppliedEntries = unAppliedEntries; - this.lastIndex = lastIndex; - this.lastTerm = lastTerm; - this.lastAppliedIndex = lastAppliedIndex; - this.lastAppliedTerm = lastAppliedTerm; - } - - - public static Snapshot create(Object state, - List entries, long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm) { - return new Snapshot(state, entries, lastIndex, lastTerm, - lastAppliedIndex, lastAppliedTerm); - } - - public Object getState() { - return state; - } - - public List getUnAppliedEntries() { - return unAppliedEntries; - } - - public long getLastTerm() { - return lastTerm; - } - - public long getLastAppliedIndex() { - return lastAppliedIndex; - } - - public long getLastAppliedTerm() { - return lastAppliedTerm; - } - - public String getLogMessage() { - StringBuilder sb = new StringBuilder(); - return sb.append("Snapshot={") - .append("lastTerm:" + this.getLastTerm() + ", ") - .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") - .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") - .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") - .toString(); - - } - } - private class ElectionTermImpl implements ElectionTerm { /** * Identifier of the actor whose election term information this is diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index e6e160bc02..c17f5448c6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.raft; +import com.google.protobuf.ByteString; + import java.util.List; /** @@ -118,7 +120,7 @@ public interface ReplicatedLog { * * @return an object representing the snapshot if it exists. null otherwise */ - Object getSnapshot(); + ByteString getSnapshot(); /** * Get the index of the snapshot @@ -134,4 +136,49 @@ public interface ReplicatedLog { * otherwise */ long getSnapshotTerm(); + + /** + * sets the snapshot index in the replicated log + * @param snapshotIndex + */ + void setSnapshotIndex(long snapshotIndex); + + /** + * sets snapshot term + * @param snapshotTerm + */ + public void setSnapshotTerm(long snapshotTerm); + + /** + * sets the snapshot in bytes + * @param snapshot + */ + public void setSnapshot(ByteString snapshot); + + /** + * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive) + * @param startIndex + * @param endIndex + */ + public void clear(int startIndex, int endIndex); + + /** + * Handles all the bookkeeping in order to perform a rollback in the + * event of SaveSnapshotFailure + * @param snapshot + * @param snapshotCapturedIndex + * @param snapshotCapturedTerm + */ + public void snapshotPreCommit(ByteString snapshot, + long snapshotCapturedIndex, long snapshotCapturedTerm); + + /** + * Sets the Replicated log to state after snapshot success. + */ + public void snapshotCommit(); + + /** + * Restores the replicated log to a state in the event of a save snapshot failure + */ + public void snapshotRollback(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java index 374e0fa9ba..2f5ba48f92 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java @@ -9,12 +9,16 @@ package org.opendaylight.controller.cluster.raft; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; public class SerializationUtils { public static Object fromSerializable(Object serializable){ if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){ return AppendEntries.fromSerializable(serializable); + + } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) { + return InstallSnapshot.fromSerializable(serializable); } return serializable; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java new file mode 100644 index 0000000000..8e0fcca9f7 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import java.io.Serializable; +import java.util.List; + + +public class Snapshot implements Serializable { + private final byte[] state; + private final List unAppliedEntries; + private final long lastIndex; + private final long lastTerm; + private final long lastAppliedIndex; + private final long lastAppliedTerm; + + private Snapshot(byte[] state, + List unAppliedEntries, long lastIndex, + long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { + this.state = state; + this.unAppliedEntries = unAppliedEntries; + this.lastIndex = lastIndex; + this.lastTerm = lastTerm; + this.lastAppliedIndex = lastAppliedIndex; + this.lastAppliedTerm = lastAppliedTerm; + } + + + public static Snapshot create(byte[] state, + List entries, long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm) { + return new Snapshot(state, entries, lastIndex, lastTerm, + lastAppliedIndex, lastAppliedTerm); + } + + public byte[] getState() { + return state; + } + + public List getUnAppliedEntries() { + return unAppliedEntries; + } + + public long getLastTerm() { + return lastTerm; + } + + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + + public long getLastIndex() { + return this.lastIndex; + } + + public String getLogMessage() { + StringBuilder sb = new StringBuilder(); + return sb.append("Snapshot={") + .append("lastTerm:" + this.getLastTerm() + ", ") + .append("lastIndex:" + this.getLastIndex() + ", ") + .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") + .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") + .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") + .toString(); + + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java index 9739fb2f1b..c356804223 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java @@ -8,16 +8,21 @@ package org.opendaylight.controller.cluster.raft.base.messages; +import org.opendaylight.controller.cluster.raft.Snapshot; + import java.io.Serializable; +/** + * Internal message, issued by follower to its actor + */ public class ApplySnapshot implements Serializable { - private final Object snapshot; + private final Snapshot snapshot; - public ApplySnapshot(Object snapshot) { + public ApplySnapshot(Snapshot snapshot) { this.snapshot = snapshot; } - public Object getSnapshot() { + public Snapshot getSnapshot() { return snapshot; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java new file mode 100644 index 0000000000..bb86e1a37d --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.base.messages; + +public class CaptureSnapshot { + private long lastAppliedIndex; + private long lastAppliedTerm; + private long lastIndex; + private long lastTerm; + + public CaptureSnapshot(long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm) { + this.lastIndex = lastIndex; + this.lastTerm = lastTerm; + this.lastAppliedIndex = lastAppliedIndex; + this.lastAppliedTerm = lastAppliedTerm; + } + + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + + public long getLastIndex() { + return lastIndex; + } + + public long getLastTerm() { + return lastTerm; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java new file mode 100644 index 0000000000..96150db689 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.base.messages; + +import com.google.protobuf.ByteString; + +public class CaptureSnapshotReply { + private ByteString snapshot; + + public CaptureSnapshotReply(ByteString snapshot) { + this.snapshot = snapshot; + } + + public ByteString getSnapshot() { + return snapshot; + } + + public void setSnapshot(ByteString snapshot) { + this.snapshot = snapshot; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 251a13d583..7e896fed29 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -305,6 +305,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param index a log index that is known to be committed */ protected void applyLogToStateMachine(final long index) { + long newLastApplied = context.getLastApplied(); // Now maybe we apply to the state machine for (long i = context.getLastApplied() + 1; i < index + 1; i++) { @@ -322,15 +323,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (replicatedLogEntry != null) { actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor()); + newLastApplied = i; } else { + //if one index is not present in the log, no point in looping + // around as the rest wont be present either context.getLogger().error( - "Missing index " + i + " from log. Cannot apply state."); + "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index ); + break; } } // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - context.getLogger().debug("Setting last applied to {}", index); - context.setLastApplied(index); + context.getLogger().debug("Setting last applied to {}", newLastApplied); + context.setLastApplied(newLastApplied); } protected Object fromSerializableMessage(Object serializable){ diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 54e0494b9d..610fdc987f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -9,17 +9,22 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import java.util.ArrayList; + /** * The behavior of a RaftActor in the Follower state *

@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; * */ public class Follower extends AbstractRaftActorBehavior { + private ByteString snapshotChunksCollected = ByteString.EMPTY; + public Follower(RaftActorContext context) { super(context); @@ -106,6 +113,9 @@ public class Follower extends AbstractRaftActorBehavior { if (outOfSync) { // We found that the log was out of sync so just send a negative // reply and return + context.getLogger().debug("Follower is out-of-sync, " + + "so sending negative reply, lastIndex():{}, lastTerm():{}", + lastIndex(), lastTerm()); sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() @@ -191,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior { // If commitIndex > lastApplied: increment lastApplied, apply // log[lastApplied] to state machine (§5.3) - if (appendEntries.getLeaderCommit() > context.getLastApplied()) { + // check if there are any entries to be applied. last-applied can be equal to last-index + if (appendEntries.getLeaderCommit() > context.getLastApplied() && + context.getLastApplied() < lastIndex()) { + context.getLogger().debug("applyLogToStateMachine, " + + "appendEntries.getLeaderCommit():{}," + + "context.getLastApplied():{}, lastIndex():{}", + appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()); applyLogToStateMachine(appendEntries.getLeaderCommit()); } @@ -234,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior { } else if (message instanceof InstallSnapshot) { InstallSnapshot installSnapshot = (InstallSnapshot) message; - actor().tell(new ApplySnapshot(installSnapshot.getData()), actor()); + handleInstallSnapshot(sender, installSnapshot); } scheduleElection(electionDuration()); @@ -242,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } + private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) { + context.getLogger().debug("InstallSnapshot received by follower " + + "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(), + installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()); + + try { + if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) { + // this is the last chunk, create a snapshot object and apply + + snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); + context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}", + snapshotChunksCollected.size()); + + Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(), + new ArrayList(), + installSnapshot.getLastIncludedIndex(), + installSnapshot.getLastIncludedTerm(), + installSnapshot.getLastIncludedIndex(), + installSnapshot.getLastIncludedTerm()); + + actor().tell(new ApplySnapshot(snapshot), actor()); + + } else { + // we have more to go + snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); + context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}", + installSnapshot.getChunkIndex(), snapshotChunksCollected.size()); + } + + sender.tell(new InstallSnapshotReply( + currentTerm(), context.getId(), installSnapshot.getChunkIndex(), + true), actor()); + + } catch (Exception e) { + context.getLogger().error("Exception in InstallSnapshot of follower", e); + //send reply with success as false. The chunk will be sent again on failure + sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), + installSnapshot.getChunkIndex(), false), actor()); + } + } + @Override public void close() throws Exception { stopElection(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 234f9db664..90948ffef7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -64,8 +66,9 @@ import java.util.concurrent.atomic.AtomicLong; public class Leader extends AbstractRaftActorBehavior { - private final Map followerToLog = + protected final Map followerToLog = new HashMap(); + protected final Map mapFollowerToSnapshot = new HashMap<>(); private final Set followers; @@ -246,16 +249,48 @@ public class Leader extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } - private void handleInstallSnapshotReply(InstallSnapshotReply message) { - InstallSnapshotReply reply = message; + private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + FollowerToSnapshot followerToSnapshot = + mapFollowerToSnapshot.get(followerId); + + if (followerToSnapshot != null && + followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + + if (reply.isSuccess()) { + if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { + //this was the last chunk reply + context.getLogger().debug("InstallSnapshotReply received, " + + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", + reply.getChunkIndex(), followerId, + context.getReplicatedLog().getSnapshotIndex() + 1); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + followerLogInformation.setMatchIndex( + context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation.setNextIndex( + context.getReplicatedLog().getSnapshotIndex() + 1); + mapFollowerToSnapshot.remove(followerId); + context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" + + followerToLog.get(followerId).getNextIndex().get()); + + } else { + followerToSnapshot.markSendStatus(true); + } + } else { + context.getLogger().info("InstallSnapshotReply received, " + + "sending snapshot chunk failed, Will retry, Chunk:{}", + reply.getChunkIndex()); + followerToSnapshot.markSendStatus(false); + } - followerLogInformation - .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation - .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } else { + context.getLogger().error("ERROR!!" + + "FollowerId in InstallSnapshotReply not known to Leader" + + " or Chunk Index in InstallSnapshotReply not matching {} != {}", + followerToSnapshot.getChunkIndex(), reply.getChunkIndex() ); + } } private void replicate(Replicate replicate) { @@ -282,30 +317,56 @@ public class Leader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); + ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - long nextIndex = followerLogInformation.getNextIndex().get(); - + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + long followerNextIndex = followerLogInformation.getNextIndex().get(); List entries = Collections.emptyList(); - if (context.getReplicatedLog().isPresent(nextIndex)) { - // FIXME : Sending one entry at a time - entries = - context.getReplicatedLog().getFrom(nextIndex, 1); + if (mapFollowerToSnapshot.get(followerId) != null) { + if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerId); + } + + } else { + + if (context.getReplicatedLog().isPresent(followerNextIndex)) { + // FIXME : Sending one entry at a time + entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + + } else { + // if the followers next index is not present in the leaders log, then snapshot should be sent + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) { + // if the follower is just not starting and leader's index + // is more than followers index + context.getLogger().debug("SendInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex); + + actor().tell(new SendInstallSnapshot(), actor()); + } else { + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + } + } } - - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(nextIndex), - prevLogTerm(nextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); } } } @@ -326,21 +387,55 @@ public class Leader extends AbstractRaftActorBehavior { long nextIndex = followerLogInformation.getNextIndex().get(); - if (!context.getReplicatedLog().isPresent(nextIndex) && context - .getReplicatedLog().isInSnapshot(nextIndex)) { - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().getSnapshot() - ), - actor() - ); + if (!context.getReplicatedLog().isPresent(nextIndex) && + context.getReplicatedLog().isInSnapshot(nextIndex)) { + sendSnapshotChunk(followerActor, followerId); } } } } + /** + * Sends a snapshot chunk to a given follower + * InstallSnapshot should qualify as a heartbeat too. + */ + private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { + try { + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + getNextSnapshotChunk(followerId, + context.getReplicatedLog().getSnapshot()), + mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks() + ).toSerializable(), + actor() + ); + context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}", + followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks()); + } catch (IOException e) { + context.getLogger().error("InstallSnapshot failed for Leader.", e); + } + } + + /** + * Acccepts snaphot as ByteString, enters into map for future chunks + * creates and return a ByteString chunk + */ + private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + if (followerToSnapshot == null) { + followerToSnapshot = new FollowerToSnapshot(snapshotBytes); + mapFollowerToSnapshot.put(followerId, followerToSnapshot); + } + ByteString nextChunk = followerToSnapshot.getNextChunk(); + context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + + return nextChunk; + } + private RaftState sendHeartBeat() { if (followers.size() > 0) { sendAppendEntries(); @@ -410,4 +505,97 @@ public class Leader extends AbstractRaftActorBehavior { return context.getId(); } + /** + * Encapsulates the snapshot bytestring and handles the logic of sending + * snapshot chunks + */ + protected class FollowerToSnapshot { + private ByteString snapshotBytes; + private int offset = 0; + // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset + private int replyReceivedForOffset; + // if replyStatus is false, the previous chunk is attempted + private boolean replyStatus = false; + private int chunkIndex; + private int totalChunks; + + public FollowerToSnapshot(ByteString snapshotBytes) { + this.snapshotBytes = snapshotBytes; + replyReceivedForOffset = -1; + chunkIndex = 1; + int size = snapshotBytes.size(); + totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); + context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}", + size, totalChunks); + } + + public ByteString getSnapshotBytes() { + return snapshotBytes; + } + + public int incrementOffset() { + if(replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + offset = offset + context.getConfigParams().getSnapshotChunkSize(); + } + return offset; + } + + public int incrementChunkIndex() { + if (replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + chunkIndex = chunkIndex + 1; + } + return chunkIndex; + } + + public int getChunkIndex() { + return chunkIndex; + } + + public int getTotalChunks() { + return totalChunks; + } + + public boolean canSendNextChunk() { + // we only send a false if a chunk is sent but we have not received a reply yet + return replyReceivedForOffset == offset; + } + + public boolean isLastChunk(int chunkIndex) { + return totalChunks == chunkIndex; + } + + public void markSendStatus(boolean success) { + if (success) { + // if the chunk sent was successful + replyReceivedForOffset = offset; + replyStatus = true; + } else { + // if the chunk sent was failure + replyReceivedForOffset = offset; + replyStatus = false; + } + } + + public ByteString getNextChunk() { + int snapshotLength = getSnapshotBytes().size(); + int start = incrementOffset(); + int size = context.getConfigParams().getSnapshotChunkSize(); + if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { + size = snapshotLength; + } else { + if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { + size = snapshotLength - start; + } + } + + context.getLogger().debug("length={}, offset={},size={}", + snapshotLength, start, size); + return getSnapshotBytes().substring(start, start + size); + + } + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java index 888854fa71..9d40fa3d9e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -8,19 +8,29 @@ package org.opendaylight.controller.cluster.raft.messages; +import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages; + public class InstallSnapshot extends AbstractRaftRPC { + public static final Class SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class; + private final String leaderId; private final long lastIncludedIndex; private final long lastIncludedTerm; - private final Object data; + private final ByteString data; + private final int chunkIndex; + private final int totalChunks; - public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) { + public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, + long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) { super(term); this.leaderId = leaderId; this.lastIncludedIndex = lastIncludedIndex; this.lastIncludedTerm = lastIncludedTerm; this.data = data; + this.chunkIndex = chunkIndex; + this.totalChunks = totalChunks; } public String getLeaderId() { @@ -35,7 +45,38 @@ public class InstallSnapshot extends AbstractRaftRPC { return lastIncludedTerm; } - public Object getData() { + public ByteString getData() { return data; } + + public int getChunkIndex() { + return chunkIndex; + } + + public int getTotalChunks() { + return totalChunks; + } + + public Object toSerializable(){ + return InstallSnapshotMessages.InstallSnapshot.newBuilder() + .setLeaderId(this.getLeaderId()) + .setChunkIndex(this.getChunkIndex()) + .setData(this.getData()) + .setLastIncludedIndex(this.getLastIncludedIndex()) + .setLastIncludedTerm(this.getLastIncludedTerm()) + .setTotalChunks(this.getTotalChunks()).build(); + + } + + public static InstallSnapshot fromSerializable (Object o) { + InstallSnapshotMessages.InstallSnapshot from = + (InstallSnapshotMessages.InstallSnapshot) o; + + InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(), + from.getLeaderId(), from.getLastIncludedIndex(), + from.getLastIncludedTerm(), from.getData(), + from.getChunkIndex(), from.getTotalChunks()); + + return installSnapshot; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java index 85b89b70ae..d293a47c8e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java @@ -13,13 +13,26 @@ public class InstallSnapshotReply extends AbstractRaftRPC { // The followerId - this will be used to figure out which follower is // responding private final String followerId; + private final int chunkIndex; + private boolean success; - protected InstallSnapshotReply(long term, String followerId) { + public InstallSnapshotReply(long term, String followerId, int chunkIndex, + boolean success) { super(term); this.followerId = followerId; + this.chunkIndex = chunkIndex; + this.success = success; } public String getFollowerId() { return followerId; } + + public int getChunkIndex() { + return chunkIndex; + } + + public boolean isSuccess() { + return success; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java new file mode 100644 index 0000000000..e801ae1c10 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java @@ -0,0 +1,1015 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: InstallSnapshot.proto + +package org.opendaylight.controller.cluster.raft.protobuff.messages; + +public final class InstallSnapshotMessages { + private InstallSnapshotMessages() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface InstallSnapshotOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional int64 term = 1; + /** + * optional int64 term = 1; + */ + boolean hasTerm(); + /** + * optional int64 term = 1; + */ + long getTerm(); + + // optional string leaderId = 2; + /** + * optional string leaderId = 2; + */ + boolean hasLeaderId(); + /** + * optional string leaderId = 2; + */ + java.lang.String getLeaderId(); + /** + * optional string leaderId = 2; + */ + com.google.protobuf.ByteString + getLeaderIdBytes(); + + // optional int64 lastIncludedIndex = 3; + /** + * optional int64 lastIncludedIndex = 3; + */ + boolean hasLastIncludedIndex(); + /** + * optional int64 lastIncludedIndex = 3; + */ + long getLastIncludedIndex(); + + // optional int64 lastIncludedTerm = 4; + /** + * optional int64 lastIncludedTerm = 4; + */ + boolean hasLastIncludedTerm(); + /** + * optional int64 lastIncludedTerm = 4; + */ + long getLastIncludedTerm(); + + // optional bytes data = 5; + /** + * optional bytes data = 5; + */ + boolean hasData(); + /** + * optional bytes data = 5; + */ + com.google.protobuf.ByteString getData(); + + // optional int32 chunkIndex = 6; + /** + * optional int32 chunkIndex = 6; + */ + boolean hasChunkIndex(); + /** + * optional int32 chunkIndex = 6; + */ + int getChunkIndex(); + + // optional int32 totalChunks = 7; + /** + * optional int32 totalChunks = 7; + */ + boolean hasTotalChunks(); + /** + * optional int32 totalChunks = 7; + */ + int getTotalChunks(); + } + /** + * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot} + */ + public static final class InstallSnapshot extends + com.google.protobuf.GeneratedMessage + implements InstallSnapshotOrBuilder { + // Use InstallSnapshot.newBuilder() to construct. + private InstallSnapshot(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private InstallSnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final InstallSnapshot defaultInstance; + public static InstallSnapshot getDefaultInstance() { + return defaultInstance; + } + + public InstallSnapshot getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private InstallSnapshot( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + term_ = input.readInt64(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + leaderId_ = input.readBytes(); + break; + } + case 24: { + bitField0_ |= 0x00000004; + lastIncludedIndex_ = input.readInt64(); + break; + } + case 32: { + bitField0_ |= 0x00000008; + lastIncludedTerm_ = input.readInt64(); + break; + } + case 42: { + bitField0_ |= 0x00000010; + data_ = input.readBytes(); + break; + } + case 48: { + bitField0_ |= 0x00000020; + chunkIndex_ = input.readInt32(); + break; + } + case 56: { + bitField0_ |= 0x00000040; + totalChunks_ = input.readInt32(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public InstallSnapshot parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new InstallSnapshot(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional int64 term = 1; + public static final int TERM_FIELD_NUMBER = 1; + private long term_; + /** + * optional int64 term = 1; + */ + public boolean hasTerm() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int64 term = 1; + */ + public long getTerm() { + return term_; + } + + // optional string leaderId = 2; + public static final int LEADERID_FIELD_NUMBER = 2; + private java.lang.Object leaderId_; + /** + * optional string leaderId = 2; + */ + public boolean hasLeaderId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string leaderId = 2; + */ + public java.lang.String getLeaderId() { + java.lang.Object ref = leaderId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + leaderId_ = s; + } + return s; + } + } + /** + * optional string leaderId = 2; + */ + public com.google.protobuf.ByteString + getLeaderIdBytes() { + java.lang.Object ref = leaderId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + leaderId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional int64 lastIncludedIndex = 3; + public static final int LASTINCLUDEDINDEX_FIELD_NUMBER = 3; + private long lastIncludedIndex_; + /** + * optional int64 lastIncludedIndex = 3; + */ + public boolean hasLastIncludedIndex() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 lastIncludedIndex = 3; + */ + public long getLastIncludedIndex() { + return lastIncludedIndex_; + } + + // optional int64 lastIncludedTerm = 4; + public static final int LASTINCLUDEDTERM_FIELD_NUMBER = 4; + private long lastIncludedTerm_; + /** + * optional int64 lastIncludedTerm = 4; + */ + public boolean hasLastIncludedTerm() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional int64 lastIncludedTerm = 4; + */ + public long getLastIncludedTerm() { + return lastIncludedTerm_; + } + + // optional bytes data = 5; + public static final int DATA_FIELD_NUMBER = 5; + private com.google.protobuf.ByteString data_; + /** + * optional bytes data = 5; + */ + public boolean hasData() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes data = 5; + */ + public com.google.protobuf.ByteString getData() { + return data_; + } + + // optional int32 chunkIndex = 6; + public static final int CHUNKINDEX_FIELD_NUMBER = 6; + private int chunkIndex_; + /** + * optional int32 chunkIndex = 6; + */ + public boolean hasChunkIndex() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int32 chunkIndex = 6; + */ + public int getChunkIndex() { + return chunkIndex_; + } + + // optional int32 totalChunks = 7; + public static final int TOTALCHUNKS_FIELD_NUMBER = 7; + private int totalChunks_; + /** + * optional int32 totalChunks = 7; + */ + public boolean hasTotalChunks() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional int32 totalChunks = 7; + */ + public int getTotalChunks() { + return totalChunks_; + } + + private void initFields() { + term_ = 0L; + leaderId_ = ""; + lastIncludedIndex_ = 0L; + lastIncludedTerm_ = 0L; + data_ = com.google.protobuf.ByteString.EMPTY; + chunkIndex_ = 0; + totalChunks_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt64(1, term_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getLeaderIdBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(3, lastIncludedIndex_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeInt64(4, lastIncludedTerm_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, data_); + } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeInt32(6, chunkIndex_); + } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeInt32(7, totalChunks_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(1, term_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getLeaderIdBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(3, lastIncludedIndex_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(4, lastIncludedTerm_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, data_); + } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(6, chunkIndex_); + } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(7, totalChunks_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class); + } + + // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + term_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + leaderId_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + lastIncludedIndex_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); + lastIncludedTerm_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); + data_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + chunkIndex_ = 0; + bitField0_ = (bitField0_ & ~0x00000020); + totalChunks_ = 0; + bitField0_ = (bitField0_ & ~0x00000040); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + } + + public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance(); + } + + public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() { + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() { + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.term_ = term_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.leaderId_ = leaderId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.lastIncludedIndex_ = lastIncludedIndex_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.lastIncludedTerm_ = lastIncludedTerm_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.data_ = data_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.chunkIndex_ = chunkIndex_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000040; + } + result.totalChunks_ = totalChunks_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) { + return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) { + if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this; + if (other.hasTerm()) { + setTerm(other.getTerm()); + } + if (other.hasLeaderId()) { + bitField0_ |= 0x00000002; + leaderId_ = other.leaderId_; + onChanged(); + } + if (other.hasLastIncludedIndex()) { + setLastIncludedIndex(other.getLastIncludedIndex()); + } + if (other.hasLastIncludedTerm()) { + setLastIncludedTerm(other.getLastIncludedTerm()); + } + if (other.hasData()) { + setData(other.getData()); + } + if (other.hasChunkIndex()) { + setChunkIndex(other.getChunkIndex()); + } + if (other.hasTotalChunks()) { + setTotalChunks(other.getTotalChunks()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional int64 term = 1; + private long term_ ; + /** + * optional int64 term = 1; + */ + public boolean hasTerm() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int64 term = 1; + */ + public long getTerm() { + return term_; + } + /** + * optional int64 term = 1; + */ + public Builder setTerm(long value) { + bitField0_ |= 0x00000001; + term_ = value; + onChanged(); + return this; + } + /** + * optional int64 term = 1; + */ + public Builder clearTerm() { + bitField0_ = (bitField0_ & ~0x00000001); + term_ = 0L; + onChanged(); + return this; + } + + // optional string leaderId = 2; + private java.lang.Object leaderId_ = ""; + /** + * optional string leaderId = 2; + */ + public boolean hasLeaderId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string leaderId = 2; + */ + public java.lang.String getLeaderId() { + java.lang.Object ref = leaderId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + leaderId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string leaderId = 2; + */ + public com.google.protobuf.ByteString + getLeaderIdBytes() { + java.lang.Object ref = leaderId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + leaderId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string leaderId = 2; + */ + public Builder setLeaderId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + leaderId_ = value; + onChanged(); + return this; + } + /** + * optional string leaderId = 2; + */ + public Builder clearLeaderId() { + bitField0_ = (bitField0_ & ~0x00000002); + leaderId_ = getDefaultInstance().getLeaderId(); + onChanged(); + return this; + } + /** + * optional string leaderId = 2; + */ + public Builder setLeaderIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + leaderId_ = value; + onChanged(); + return this; + } + + // optional int64 lastIncludedIndex = 3; + private long lastIncludedIndex_ ; + /** + * optional int64 lastIncludedIndex = 3; + */ + public boolean hasLastIncludedIndex() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 lastIncludedIndex = 3; + */ + public long getLastIncludedIndex() { + return lastIncludedIndex_; + } + /** + * optional int64 lastIncludedIndex = 3; + */ + public Builder setLastIncludedIndex(long value) { + bitField0_ |= 0x00000004; + lastIncludedIndex_ = value; + onChanged(); + return this; + } + /** + * optional int64 lastIncludedIndex = 3; + */ + public Builder clearLastIncludedIndex() { + bitField0_ = (bitField0_ & ~0x00000004); + lastIncludedIndex_ = 0L; + onChanged(); + return this; + } + + // optional int64 lastIncludedTerm = 4; + private long lastIncludedTerm_ ; + /** + * optional int64 lastIncludedTerm = 4; + */ + public boolean hasLastIncludedTerm() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional int64 lastIncludedTerm = 4; + */ + public long getLastIncludedTerm() { + return lastIncludedTerm_; + } + /** + * optional int64 lastIncludedTerm = 4; + */ + public Builder setLastIncludedTerm(long value) { + bitField0_ |= 0x00000008; + lastIncludedTerm_ = value; + onChanged(); + return this; + } + /** + * optional int64 lastIncludedTerm = 4; + */ + public Builder clearLastIncludedTerm() { + bitField0_ = (bitField0_ & ~0x00000008); + lastIncludedTerm_ = 0L; + onChanged(); + return this; + } + + // optional bytes data = 5; + private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes data = 5; + */ + public boolean hasData() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes data = 5; + */ + public com.google.protobuf.ByteString getData() { + return data_; + } + /** + * optional bytes data = 5; + */ + public Builder setData(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + data_ = value; + onChanged(); + return this; + } + /** + * optional bytes data = 5; + */ + public Builder clearData() { + bitField0_ = (bitField0_ & ~0x00000010); + data_ = getDefaultInstance().getData(); + onChanged(); + return this; + } + + // optional int32 chunkIndex = 6; + private int chunkIndex_ ; + /** + * optional int32 chunkIndex = 6; + */ + public boolean hasChunkIndex() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int32 chunkIndex = 6; + */ + public int getChunkIndex() { + return chunkIndex_; + } + /** + * optional int32 chunkIndex = 6; + */ + public Builder setChunkIndex(int value) { + bitField0_ |= 0x00000020; + chunkIndex_ = value; + onChanged(); + return this; + } + /** + * optional int32 chunkIndex = 6; + */ + public Builder clearChunkIndex() { + bitField0_ = (bitField0_ & ~0x00000020); + chunkIndex_ = 0; + onChanged(); + return this; + } + + // optional int32 totalChunks = 7; + private int totalChunks_ ; + /** + * optional int32 totalChunks = 7; + */ + public boolean hasTotalChunks() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional int32 totalChunks = 7; + */ + public int getTotalChunks() { + return totalChunks_; + } + /** + * optional int32 totalChunks = 7; + */ + public Builder setTotalChunks(int value) { + bitField0_ |= 0x00000040; + totalChunks_ = value; + onChanged(); + return this; + } + /** + * optional int32 totalChunks = 7; + */ + public Builder clearTotalChunks() { + bitField0_ = (bitField0_ & ~0x00000040); + totalChunks_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot) + } + + static { + defaultInstance = new InstallSnapshot(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\025InstallSnapshot.proto\022(org.opendayligh" + + "t.controller.cluster.raft\"\235\001\n\017InstallSna" + + "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" + + "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" + + "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" + + " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" + + "light.controller.cluster.raft.protobuff." + + "messagesB\027InstallSnapshotMessagesH\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor, + new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto new file mode 100644 index 0000000000..14f821b5e2 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto @@ -0,0 +1,15 @@ +package org.opendaylight.controller.cluster.raft; + +option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages"; +option java_outer_classname = "InstallSnapshotMessages"; +option optimize_for = SPEED; + +message InstallSnapshot { + optional int64 term = 1; + optional string leaderId = 2; + optional int64 lastIncludedIndex = 3; + optional int64 lastIncludedTerm = 4; + optional bytes data = 5; + optional int32 chunkIndex = 6; + optional int32 totalChunks = 7; +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index ea3c9e759d..ca34a34ca4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -14,17 +14,14 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; +import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages; -import com.google.common.base.Preconditions; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; public class MockRaftActorContext implements RaftActorContext { @@ -37,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext { private final ElectionTerm electionTerm; private ReplicatedLog replicatedLog; private Map peerAddresses = new HashMap(); + private ConfigParams configParams; public MockRaftActorContext(){ electionTerm = null; @@ -79,6 +77,8 @@ public class MockRaftActorContext implements RaftActorContext { } }; + configParams = new DefaultConfigParamsImpl(); + initReplicatedLog(); } @@ -179,118 +179,21 @@ public class MockRaftActorContext implements RaftActorContext { @Override public ConfigParams getConfigParams() { - return new DefaultConfigParamsImpl(); + return configParams; } - public static class SimpleReplicatedLog implements ReplicatedLog { - private final List log = new ArrayList<>(); - - @Override public ReplicatedLogEntry get(long index) { - if(index >= log.size() || index < 0){ - return null; - } - return log.get((int) index); - } - - @Override public ReplicatedLogEntry last() { - if(log.size() == 0){ - return null; - } - return log.get(log.size()-1); - } - - @Override public long lastIndex() { - if(log.size() == 0){ - return -1; - } - - return last().getIndex(); - } - - @Override public long lastTerm() { - if(log.size() == 0){ - return -1; - } - - return last().getTerm(); - } - - @Override public void removeFrom(long index) { - if(index >= log.size() || index < 0){ - return; - } - - log.subList((int) index, log.size()).clear(); - //log.remove((int) index); - } - - @Override public void removeFromAndPersist(long index) { - removeFrom(index); - } - - @Override public void append(ReplicatedLogEntry replicatedLogEntry) { - log.add(replicatedLogEntry); - } + public void setConfigParams(ConfigParams configParams) { + this.configParams = configParams; + } + public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl { @Override public void appendAndPersist( ReplicatedLogEntry replicatedLogEntry) { append(replicatedLogEntry); } - @Override public List getFrom(long index) { - if(index >= log.size() || index < 0){ - return Collections.EMPTY_LIST; - } - List entries = new ArrayList<>(); - for(int i=(int) index ; i < log.size() ; i++) { - entries.add(get(i)); - } - return entries; - } - - @Override public List getFrom(long index, int max) { - if(index >= log.size() || index < 0){ - return Collections.EMPTY_LIST; - } - List entries = new ArrayList<>(); - int maxIndex = (int) index + max; - if(maxIndex > log.size()){ - maxIndex = log.size(); - } - - for(int i=(int) index ; i < maxIndex ; i++) { - entries.add(get(i)); - } - return entries; - - } - - @Override public long size() { - return log.size(); - } - - @Override public boolean isPresent(long index) { - if(index >= log.size() || index < 0){ - return false; - } - - return true; - } - - @Override public boolean isInSnapshot(long index) { - return false; - } - - @Override public Object getSnapshot() { - return null; - } - - @Override public long getSnapshotIndex() { - return -1; - } - - @Override public long getSnapshotTerm() { - return -1; + @Override public void removeFromAndPersist(long index) { + removeFrom(index); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index ff0ffeb271..12123db129 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -6,6 +6,7 @@ import akka.actor.Props; import akka.event.Logging; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import com.google.protobuf.ByteString; import org.junit.Test; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; @@ -39,11 +40,11 @@ public class RaftActorTest extends AbstractActorTest { Object data) { } - @Override protected Object createSnapshot() { + @Override protected void createSnapshot() { throw new UnsupportedOperationException("createSnapshot"); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { throw new UnsupportedOperationException("applySnapshot"); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index c5a81aa1c9..227d1effa7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -158,17 +158,18 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { createActorContext(); context.setLastApplied(100); - setLastLogEntry((MockRaftActorContext) context, 0, 0, new MockRaftActorContext.MockPayload("")); + setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload("")); + ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99); List entries = Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101, + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101, new MockRaftActorContext.MockPayload("foo")) ); // The new commitIndex is 101 AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, entries, 101); + new AppendEntries(2, "leader-1", 100, 1, entries, 101); RaftState raftState = createBehavior(context).handleMessage(getRef(), appendEntries); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 17c22a134a..73c9f96b82 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,24 +1,40 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.testkit.JavaTestKit; -import junit.framework.Assert; +import com.google.protobuf.ByteString; +import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation; +import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class LeaderTest extends AbstractRaftActorBehaviorTest { @@ -82,8 +98,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals("match", out); } - - }; }}; } @@ -194,18 +208,372 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals("match", out); } + }; + }}; + } + + @Test + public void testSendInstallSnapshot() { + new LeaderTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(getRef()); + actorContext.setPeerAddresses(peerAddresses); + + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshot( + toByteString(leadersSnapshot)); + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + + MockLeader leader = new MockLeader(actorContext); + // set the follower info in leader + leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex + RaftState raftState = leader.handleMessage( + senderActor, new Replicate(null, "state-id", entry)); + + assertEquals(RaftState.Leader, raftState); + + // we might receive some heartbeat messages, so wait till we SendInstallSnapshot + Boolean[] matches = new ReceiveWhile(Boolean.class, duration("2 seconds")) { + @Override + protected Boolean match(Object o) throws Exception { + if (o instanceof SendInstallSnapshot) { + return true; + } + return false; + } + }.get(); + + boolean sendInstallSnapshotReceived = false; + for (Boolean b: matches) { + sendInstallSnapshotReceived = b | sendInstallSnapshotReceived; + } + + assertTrue(sendInstallSnapshotReceived); + + } + }; + }}; + } + + @Test + public void testInstallSnapshot() { + new LeaderTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + actorContext.setPeerAddresses(peerAddresses); + + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot)); + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + MockLeader leader = new MockLeader(actorContext); + // set the follower info in leader + leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + + RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot()); + + assertEquals(RaftState.Leader, raftState); + + // check if installsnapshot gets called with the correct values. + final String out = + new ExpectMsg(duration("1 seconds"), "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof InstallSnapshotMessages.InstallSnapshot) { + InstallSnapshot is = (InstallSnapshot) + SerializationUtils.fromSerializable(in); + if (is.getData() == null) { + return "InstallSnapshot data is null"; + } + if (is.getLastIncludedIndex() != snapshotIndex) { + return is.getLastIncludedIndex() + "!=" + snapshotIndex; + } + if (is.getLastIncludedTerm() != snapshotTerm) { + return is.getLastIncludedTerm() + "!=" + snapshotTerm; + } + if (is.getTerm() == currentTerm) { + return is.getTerm() + "!=" + currentTerm; + } + + return "match"; + + } else { + return "message mismatch:" + in.getClass(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + } + }; + }}; + } + + @Test + public void testHandleInstallSnapshotReplyLastChunk() { + new LeaderTestKit(getSystem()) {{ + new Within(duration("1 seconds")) { + protected void run() { + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + actorContext.setPeerAddresses(peerAddresses); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockLeader leader = new MockLeader(actorContext); + // set the follower info in leader + leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshot( + toByteString(leadersSnapshot)); + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.createFollowerToSnapshot(followerActor.path().toString(), bs); + while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) { + leader.getFollowerToSnapshot().getNextChunk(); + leader.getFollowerToSnapshot().incrementChunkIndex(); + } + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + RaftState raftState = leader.handleMessage(senderActor, + new InstallSnapshotReply(currentTerm, followerActor.path().toString(), + leader.getFollowerToSnapshot().getChunkIndex(), true)); + assertEquals(RaftState.Leader, raftState); + + assertEquals(leader.mapFollowerToSnapshot.size(), 0); + assertEquals(leader.followerToLog.size(), 1); + assertNotNull(leader.followerToLog.get(followerActor.path().toString())); + FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString()); + assertEquals(snapshotIndex, fli.getMatchIndex().get()); + assertEquals(snapshotIndex, fli.getMatchIndex().get()); + assertEquals(snapshotIndex + 1, fli.getNextIndex().get()); + } }; }}; } + @Test + public void testFollowerToSnapshotLogic() { + + MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); + + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + ByteString bs = toByteString(leadersSnapshot); + byte[] barray = bs.toByteArray(); + + leader.createFollowerToSnapshot("followerId", bs); + assertEquals(bs.size(), barray.length); + + int chunkIndex=0; + for (int i=0; i < barray.length; i = i + 50) { + int j = i + 50; + chunkIndex++; + + if (i + 50 > barray.length) { + j = barray.length; + } + + ByteString chunk = leader.getFollowerToSnapshot().getNextChunk(); + assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size()); + assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex()); + + leader.getFollowerToSnapshot().markSendStatus(true); + if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) { + leader.getFollowerToSnapshot().incrementChunkIndex(); + } + } + + assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks()); + } + + @Override protected RaftActorBehavior createBehavior( RaftActorContext actorContext) { return new Leader(actorContext); } @Override protected RaftActorContext createActorContext() { - return new MockRaftActorContext("test", getSystem(), leaderActor); + return createActorContext(leaderActor); + } + + protected RaftActorContext createActorContext(ActorRef actorRef) { + return new MockRaftActorContext("test", getSystem(), actorRef); + } + + private ByteString toByteString(Map state) { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(state); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } catch (IOException e) { + Assert.fail("IOException in converting Hashmap to Bytestring:" + e); + } + return null; + } + + private static class LeaderTestKit extends JavaTestKit { + + private LeaderTestKit(ActorSystem actorSystem) { + super(actorSystem); + } + + protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(logLevel + ) { + @Override + protected Boolean run() { + return true; + } + }.from(subject.path().toString()) + .message(logMessage) + .occurrences(1).exec(); + + Assert.assertEquals(true, result); + + } + } + + class MockLeader extends Leader { + + FollowerToSnapshot fts; + + public MockLeader(RaftActorContext context){ + super(context); + } + + public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) { + FollowerLogInformation followerLogInformation = + new FollowerLogInformationImpl(followerId, + new AtomicLong(nextIndex), + new AtomicLong(matchIndex)); + followerToLog.put(followerId, followerLogInformation); + } + + public FollowerToSnapshot getFollowerToSnapshot() { + return fts; + } + + public void createFollowerToSnapshot(String followerId, ByteString bs ) { + fts = new FollowerToSnapshot(bs); + mapFollowerToSnapshot.put(followerId, fts); + + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 43a9faa3e4..d8af74c84c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; - +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; @@ -365,7 +365,6 @@ public class Shard extends RaftActor { identifier, clientActor.path().toString()); } - } else { LOG.error("Unknown state received {}", data); } @@ -383,11 +382,11 @@ public class Shard extends RaftActor { } - @Override protected Object createSnapshot() { + @Override protected void createSnapshot() { throw new UnsupportedOperationException("createSnapshot"); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { throw new UnsupportedOperationException("applySnapshot"); } -- 2.36.6