Merge "Install snapshot and Reply"
authorMoiz Raja <moraja@cisco.com>
Thu, 4 Sep 2014 21:26:58 +0000 (21:26 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 4 Sep 2014 21:26:58 +0000 (21:26 +0000)
25 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index cbd7ca2d70f5dc090a1e842b75c200cb0c1976b9..c4ff108611d9fbdb177f2ef4ace98bb030d69991 100644 (file)
@@ -12,14 +12,21 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -82,14 +89,63 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    @Override protected Object createSnapshot() {
-        return state;
+    @Override protected void createSnapshot() {
+        ByteString bs = null;
+        try {
+            bs = fromObject(state);
+        } catch (Exception e) {
+            LOG.error("Exception in creating snapshot", e);
+        }
+        getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         state.clear();
-        state.putAll((HashMap) snapshot);
-        LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
+        try {
+            state.putAll((HashMap) toObject(snapshot));
+        } catch (Exception e) {
+           LOG.error("Exception in applying snapshot", e);
+        }
+        LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+    }
+
+    private ByteString fromObject(Object snapshot) throws Exception {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            b = new ByteArrayOutputStream();
+            o = new ObjectOutputStream(b);
+            o.writeObject(snapshot);
+            byte[] snapshotBytes = b.toByteArray();
+            return ByteString.copyFrom(snapshotBytes);
+        } finally {
+            if (o != null) {
+                o.flush();
+                o.close();
+            }
+            if (b != null) {
+                b.close();
+            }
+        }
+    }
+
+    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+        Object obj = null;
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try {
+            bis = new ByteArrayInputStream(bs.toByteArray());
+            ois = new ObjectInputStream(bis);
+            obj = ois.readObject();
+        } finally {
+            if (bis != null) {
+                bis.close();
+            }
+            if (ois != null) {
+                ois.close();
+            }
+        }
+        return obj;
     }
 
     @Override protected void onStateChanged() {
index d11377dbcb359e58fdbbf3494a20ca6ded6565c4..6192cad2307b99896326619233bf9903f7aeb41a 100644 (file)
@@ -17,4 +17,9 @@ public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
     public long getSnapshotBatchCount() {
         return 50;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return 50;
+    }
 }
index fd6e192bf0497777de2643a1b3f28a2a76b72e42..978ea91089dbcb1a530c9d66f6878ffa06579e2d 100644 (file)
@@ -109,6 +109,8 @@ public class TestDriver {
                 td.printState();
             } else if (command.startsWith("printNodes")) {
                 td.printNodes();
+            } else {
+                System.out.println("Invalid command:" + command);
             }
 
         }
index b5b034afb9cf8edc7635cfca5509c93cbeb457b5..b436bce50061f98d0362ce3df4336c4279977d63 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -16,12 +18,18 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected final List<ReplicatedLogEntry> journal;
-    protected final Object snapshot;
+    protected List<ReplicatedLogEntry> journal;
+    protected ByteString snapshot;
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
-    public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+    // to be used for rollback during save snapshot failure
+    protected List<ReplicatedLogEntry> snapshottedJournal;
+    protected ByteString previousSnapshot;
+    protected long previousSnapshotIndex = -1;
+    protected long previousSnapshotTerm = -1;
+
+    public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
         this.snapshot = state;
         this.snapshotIndex = snapshotIndex;
@@ -137,11 +145,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex;
+        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
-    public Object getSnapshot() {
+    public ByteString getSnapshot() {
         return snapshot;
     }
 
@@ -160,4 +168,68 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public abstract void removeFromAndPersist(long index);
+
+    @Override
+    public void setSnapshotIndex(long snapshotIndex) {
+        this.snapshotIndex = snapshotIndex;
+    }
+
+    @Override
+    public void setSnapshotTerm(long snapshotTerm) {
+        this.snapshotTerm = snapshotTerm;
+    }
+
+    @Override
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override
+    public void clear(int startIndex, int endIndex) {
+        journal.subList(startIndex, endIndex).clear();
+    }
+
+    @Override
+    public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+        snapshottedJournal = new ArrayList<>(journal.size());
+
+        snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+        previousSnapshotIndex = snapshotIndex;
+        setSnapshotIndex(snapshotCapturedIndex);
+
+        previousSnapshotTerm = snapshotTerm;
+        setSnapshotTerm(snapshotCapturedTerm);
+
+        previousSnapshot = getSnapshot();
+        setSnapshot(snapshot);
+    }
+
+    @Override
+    public void snapshotCommit() {
+        snapshottedJournal.clear();
+        snapshottedJournal = null;
+        previousSnapshotIndex = -1;
+        previousSnapshotTerm = -1;
+        previousSnapshot = null;
+    }
+
+    @Override
+    public void snapshotRollback() {
+        snapshottedJournal.addAll(journal);
+        journal.clear();
+        journal = snapshottedJournal;
+        snapshottedJournal = null;
+
+        snapshotIndex = previousSnapshotIndex;
+        previousSnapshotIndex = -1;
+
+        snapshotTerm = previousSnapshotTerm;
+        previousSnapshotTerm = -1;
+
+        snapshot = previousSnapshot;
+        previousSnapshot = null;
+
+    }
 }
index 4c6434aec457db0b4899973f36effd0b37e7d14a..ed6439d8c33bceb545927fd6c2f892665b160f8a 100644 (file)
@@ -52,4 +52,9 @@ public interface ConfigParams {
      * @return int
      */
     public int getElectionTimeVariance();
+
+    /**
+     * The size (in bytes) of the snapshot chunk sent from Leader
+     */
+    public int getSnapshotChunkSize();
 }
index 6432fa4811beb64ef13f6869e6e252289c2163be..75c237f5035e57abd61c839835ec3c78548a6157 100644 (file)
@@ -25,6 +25,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
+    private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+
 
     /**
      * The interval at which a heart beat message will be sent to the remote
@@ -58,4 +60,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getElectionTimeVariance() {
         return ELECTION_TIME_MAX_VARIANCE;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return SNAPSHOT_CHUNK_SIZE;
+    }
 }
index 988789b4011e4f3ba9e9e9abe6b0c3009704c810..296ce2d24aaa24b3920d789697ddb784ccec7bea 100644 (file)
@@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -98,6 +103,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
+    private CaptureSnapshot captureSnapshot = null;
+
+    private volatile boolean hasSnapshotCaptureInitiated = false;
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
@@ -125,6 +133,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             replicatedLog = new ReplicatedLogImpl(snapshot);
 
             context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
             LOG.debug("Applied snapshot to replicatedLog. " +
                 "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -132,7 +141,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 replicatedLog.size());
 
             // Apply the snapshot to the actors state
-            applySnapshot(snapshot.getState());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
@@ -164,7 +173,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 applyState.getReplicatedLogEntry().getData());
 
         } else if(message instanceof ApplySnapshot ) {
-            applySnapshot(((ApplySnapshot) message).getSnapshot());
+            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
+
+            LOG.debug("ApplySnapshot called on Follower Actor " +
+                "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            //clears the followers log, sets the snapshot index to ensure adjusted-index works
+            replicatedLog = new ReplicatedLogImpl(snapshot);
+            context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
@@ -174,13 +193,26 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+            LOG.info("SaveSnapshotSuccess received for snapshot");
+
+            context.getReplicatedLog().snapshotCommit();
 
             // TODO: Not sure if we want to be this aggressive with trimming stuff
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
+
+            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
+            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
 
-            // TODO: Handle failure in saving the snapshot
+            context.getReplicatedLog().snapshotRollback();
+
+            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+                context.getReplicatedLog().getSnapshotIndex(),
+                context.getReplicatedLog().getSnapshotTerm(),
+                context.getReplicatedLog().size());
 
         } else if (message instanceof AddRaftPeer){
 
@@ -196,7 +228,25 @@ public abstract class RaftActor extends UntypedPersistentActor {
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
 
+        } else if (message instanceof CaptureSnapshot) {
+            LOG.debug("CaptureSnapshot received by actor");
+            CaptureSnapshot cs = (CaptureSnapshot)message;
+            captureSnapshot = cs;
+            createSnapshot();
+
+        } else if (message instanceof CaptureSnapshotReply){
+            LOG.debug("CaptureSnapshotReply received by actor");
+            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+
+            ByteString stateInBytes = csr.getSnapshot();
+            LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+            handleCaptureSnapshotReply(stateInBytes);
+
         } else {
+            if (!(message instanceof AppendEntriesMessages.AppendEntries)
+                && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
+                LOG.debug("onReceiveCommand: message:" + message.getClass());
+            }
 
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
@@ -344,7 +394,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @return The current state of the actor
      */
-    protected abstract Object createSnapshot();
+    protected abstract void createSnapshot();
 
     /**
      * This method will be called by the RaftActor during recovery to
@@ -356,7 +406,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(Object snapshot);
+    protected abstract void applySnapshot(ByteString snapshot);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -423,11 +473,39 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return peerAddress;
     }
 
+    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+        // create a snapshot object from the state provided and save it
+        // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+        saveSnapshot(sn);
+
+        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+        //be greedy and remove entries from in-mem journal which are in the snapshot
+        // and update snapshotIndex and snapshotTerm without waiting for the success,
+
+        context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+            captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        captureSnapshot = null;
+        hasSnapshotCaptureInitiated = false;
+    }
+
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getState(),
+            super(ByteString.copyFrom(snapshot.getState()),
                 snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
         }
@@ -476,8 +554,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
-                        // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
+                        // when a snaphsot is being taken, captureSnapshot != null
+                        if (hasSnapshotCaptureInitiated == false &&
+                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
@@ -493,26 +573,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
                             LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
                             LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
 
-                            // create a snapshot object from the state provided and save it
-                            // when snapshot is saved async, SaveSnapshotSuccess is raised.
-                            Snapshot sn = Snapshot.create(createSnapshot(),
-                                getFrom(context.getLastApplied() + 1),
-                                lastIndex(), lastTerm(), lastAppliedIndex,
-                                lastAppliedTerm);
-                            saveSnapshot(sn);
-
-                            LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
-
-                            //be greedy and remove entries from in-mem journal which are in the snapshot
-                            // and update snapshotIndex and snapshotTerm without waiting for the success,
-                            // TODO: damage-recovery to be done on failure
-                            journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
-                            snapshotIndex = lastAppliedIndex;
-                            snapshotTerm = lastAppliedTerm;
-
-                            LOG.info("Removed in-memory snapshotted entries, " +
-                                "adjusted snaphsotIndex:{}" +
-                                "and term:{}", snapshotIndex, lastAppliedTerm);
+                            // send a CaptureSnapshot to self to make the expensive operation async.
+                            getSelf().tell(new CaptureSnapshot(
+                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                                null);
+                            hasSnapshotCaptureInitiated = true;
                         }
                         // Send message for replication
                         if (clientActor != null) {
@@ -542,65 +607,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
-    private static class Snapshot implements Serializable {
-        private final Object state;
-        private final List<ReplicatedLogEntry> unAppliedEntries;
-        private final long lastIndex;
-        private final long lastTerm;
-        private final long lastAppliedIndex;
-        private final long lastAppliedTerm;
-
-        private Snapshot(Object state,
-            List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
-            long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
-            this.state = state;
-            this.unAppliedEntries = unAppliedEntries;
-            this.lastIndex = lastIndex;
-            this.lastTerm = lastTerm;
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-        }
-
-
-        public static Snapshot create(Object state,
-            List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
-            long lastAppliedIndex, long lastAppliedTerm) {
-            return new Snapshot(state, entries, lastIndex, lastTerm,
-                lastAppliedIndex, lastAppliedTerm);
-        }
-
-        public Object getState() {
-            return state;
-        }
-
-        public List<ReplicatedLogEntry> getUnAppliedEntries() {
-            return unAppliedEntries;
-        }
-
-        public long getLastTerm() {
-            return lastTerm;
-        }
-
-        public long getLastAppliedIndex() {
-            return lastAppliedIndex;
-        }
-
-        public long getLastAppliedTerm() {
-            return lastAppliedTerm;
-        }
-
-        public String getLogMessage() {
-            StringBuilder sb = new StringBuilder();
-            return sb.append("Snapshot={")
-                .append("lastTerm:" + this.getLastTerm()  + ", ")
-                .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
-                .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
-                .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
-                .toString();
-
-        }
-    }
-
     private class ElectionTermImpl implements ElectionTerm {
         /**
          * Identifier of the actor whose election term information this is
index e6e160bc02bf1fd72305325aefc91a1ac2a9fac0..c17f5448c6e256a97c4f7134959bb6c2d88a0971 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.List;
 
 /**
@@ -118,7 +120,7 @@ public interface ReplicatedLog {
      *
      * @return an object representing the snapshot if it exists. null otherwise
      */
-    Object getSnapshot();
+    ByteString getSnapshot();
 
     /**
      * Get the index of the snapshot
@@ -134,4 +136,49 @@ public interface ReplicatedLog {
      * otherwise
      */
     long getSnapshotTerm();
+
+    /**
+     * sets the snapshot index in the replicated log
+     * @param snapshotIndex
+     */
+    void setSnapshotIndex(long snapshotIndex);
+
+    /**
+     * sets snapshot term
+     * @param snapshotTerm
+     */
+    public void setSnapshotTerm(long snapshotTerm);
+
+    /**
+     * sets the snapshot in bytes
+     * @param snapshot
+     */
+    public void setSnapshot(ByteString snapshot);
+
+    /**
+     * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
+     * @param startIndex
+     * @param endIndex
+     */
+    public void clear(int startIndex, int endIndex);
+
+    /**
+     * Handles all the bookkeeping in order to perform a rollback in the
+     * event of SaveSnapshotFailure
+     * @param snapshot
+     * @param snapshotCapturedIndex
+     * @param snapshotCapturedTerm
+     */
+    public void snapshotPreCommit(ByteString snapshot,
+        long snapshotCapturedIndex, long snapshotCapturedTerm);
+
+    /**
+     * Sets the Replicated log to state after snapshot success.
+     */
+    public void snapshotCommit();
+
+    /**
+     * Restores the replicated log to a state in the event of a save snapshot failure
+     */
+    public void snapshotRollback();
 }
index 374e0fa9ba766cfe0417347fc4d6a4602d21fcdb..2f5ba48f9258d9a47ebf62ac98966db5d1055d99 100644 (file)
@@ -9,12 +9,16 @@
 package org.opendaylight.controller.cluster.raft;
 
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 
 public class SerializationUtils {
 
     public static Object fromSerializable(Object serializable){
         if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){
             return AppendEntries.fromSerializable(serializable);
+
+        } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+            return InstallSnapshot.fromSerializable(serializable);
         }
         return serializable;
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
new file mode 100644 (file)
index 0000000..8e0fcca
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+public class Snapshot implements Serializable {
+    private final byte[] state;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
+    private final long lastIndex;
+    private final long lastTerm;
+    private final long lastAppliedIndex;
+    private final long lastAppliedTerm;
+
+    private Snapshot(byte[] state,
+        List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+        long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+        this.state = state;
+        this.unAppliedEntries = unAppliedEntries;
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+
+    public static Snapshot create(byte[] state,
+        List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        return new Snapshot(state, entries, lastIndex, lastTerm,
+            lastAppliedIndex, lastAppliedTerm);
+    }
+
+    public byte[] getState() {
+        return state;
+    }
+
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return this.lastIndex;
+    }
+
+    public String getLogMessage() {
+        StringBuilder sb = new StringBuilder();
+        return sb.append("Snapshot={")
+            .append("lastTerm:" + this.getLastTerm() + ", ")
+            .append("lastIndex:" + this.getLastIndex()  + ", ")
+            .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
+            .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
+            .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
+            .toString();
+
+    }
+}
index 9739fb2f1b0ccb42e5fcd0446e5769194797be31..c356804223c696e83f10c73e17da35d63d419c85 100644 (file)
@@ -8,16 +8,21 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import org.opendaylight.controller.cluster.raft.Snapshot;
+
 import java.io.Serializable;
 
+/**
+ * Internal message, issued by follower to its actor
+ */
 public class ApplySnapshot implements Serializable {
-    private final Object snapshot;
+    private final Snapshot snapshot;
 
-    public ApplySnapshot(Object snapshot) {
+    public ApplySnapshot(Snapshot snapshot) {
         this.snapshot = snapshot;
     }
 
-    public Object getSnapshot() {
+    public Snapshot getSnapshot() {
         return snapshot;
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
new file mode 100644 (file)
index 0000000..bb86e1a
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+public class CaptureSnapshot {
+    private long lastAppliedIndex;
+    private long lastAppliedTerm;
+    private long lastIndex;
+    private long lastTerm;
+
+    public CaptureSnapshot(long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return lastIndex;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java
new file mode 100644 (file)
index 0000000..96150db
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import com.google.protobuf.ByteString;
+
+public class CaptureSnapshotReply {
+    private ByteString snapshot;
+
+    public CaptureSnapshotReply(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    public ByteString getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+}
index 251a13d583ec444ac4ca0c1cc028831feeb48958..7e896fed29c4889f6aec5ce39436a1970a50e03b 100644 (file)
@@ -305,6 +305,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param index a log index that is known to be committed
      */
     protected void applyLogToStateMachine(final long index) {
+        long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1;
              i < index + 1; i++) {
@@ -322,15 +323,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             if (replicatedLogEntry != null) {
                 actor().tell(new ApplyState(clientActor, identifier,
                     replicatedLogEntry), actor());
+                newLastApplied = i;
             } else {
+                //if one index is not present in the log, no point in looping
+                // around as the rest wont be present either
                 context.getLogger().error(
-                    "Missing index " + i + " from log. Cannot apply state.");
+                    "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+                break;
             }
         }
         // Send a local message to the local RaftActor (it's derived class to be
         // specific to apply the log to it's index)
-        context.getLogger().debug("Setting last applied to {}", index);
-        context.setLastApplied(index);
+        context.getLogger().debug("Setting last applied to {}", newLastApplied);
+        context.setLastApplied(newLastApplied);
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 54e0494b9da65305729afcf00686e3102c31dd00..610fdc987fde7a1a51491ef25dc2f764011b9eda 100644 (file)
@@ -9,17 +9,22 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
+import java.util.ArrayList;
+
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
+    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
     public Follower(RaftActorContext context) {
         super(context);
 
@@ -106,6 +113,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
+            context.getLogger().debug("Follower is out-of-sync, " +
+                "so sending negative reply, lastIndex():{}, lastTerm():{}",
+                lastIndex(), lastTerm());
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -191,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior {
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (§5.3)
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+        // check if there are any entries to be applied. last-applied can be equal to last-index
+        if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+            context.getLastApplied() < lastIndex()) {
+            context.getLogger().debug("applyLogToStateMachine, " +
+                "appendEntries.getLeaderCommit():{}," +
+                "context.getLastApplied():{}, lastIndex():{}",
+                appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
@@ -234,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+            handleInstallSnapshot(sender, installSnapshot);
         }
 
         scheduleElection(electionDuration());
@@ -242,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+        context.getLogger().debug("InstallSnapshot received by follower " +
+            "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+        try {
+            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+                // this is the last chunk, create a snapshot object and apply
+
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+                    snapshotChunksCollected.size());
+
+                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+                    new ArrayList<ReplicatedLogEntry>(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm());
+
+                actor().tell(new ApplySnapshot(snapshot), actor());
+
+            } else {
+                // we have more to go
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+                    installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+            }
+
+            sender.tell(new InstallSnapshotReply(
+                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                true), actor());
+
+        } catch (Exception e) {
+            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            //send reply with success as false. The chunk will be sent again on failure
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                installSnapshot.getChunkIndex(), false), actor());
+        }
+    }
+
     @Override public void close() throws Exception {
         stopElection();
     }
index 234f9db664e4d43e833e4e354e3d3045094dd381..90948ffef7d8a5e1341bb8aede6b03ccf8dae344 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,8 +66,9 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Leader extends AbstractRaftActorBehavior {
 
 
-    private final Map<String, FollowerLogInformation> followerToLog =
+    protected final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
+    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private final Set<String> followers;
 
@@ -246,16 +249,48 @@ public class Leader extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
-    private void handleInstallSnapshotReply(InstallSnapshotReply message) {
-        InstallSnapshotReply reply = message;
+    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
+        FollowerToSnapshot followerToSnapshot =
+            mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot != null &&
+            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+            if (reply.isSuccess()) {
+                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+                    //this was the last chunk reply
+                    context.getLogger().debug("InstallSnapshotReply received, " +
+                        "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+                        reply.getChunkIndex(), followerId,
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+
+                    FollowerLogInformation followerLogInformation =
+                        followerToLog.get(followerId);
+                    followerLogInformation.setMatchIndex(
+                        context.getReplicatedLog().getSnapshotIndex());
+                    followerLogInformation.setNextIndex(
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    mapFollowerToSnapshot.remove(followerId);
+                    context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
+                        followerToLog.get(followerId).getNextIndex().get());
+
+                } else {
+                    followerToSnapshot.markSendStatus(true);
+                }
+            } else {
+                context.getLogger().info("InstallSnapshotReply received, " +
+                    "sending snapshot chunk failed, Will retry, Chunk:{}",
+                    reply.getChunkIndex());
+                followerToSnapshot.markSendStatus(false);
+            }
 
-        followerLogInformation
-            .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
-        followerLogInformation
-            .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+        } else {
+            context.getLogger().error("ERROR!!" +
+                "FollowerId in InstallSnapshotReply not known to Leader" +
+                " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+                followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+        }
     }
 
     private void replicate(Replicate replicate) {
@@ -282,30 +317,56 @@ public class Leader extends AbstractRaftActorBehavior {
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
         for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long followerNextIndex = followerLogInformation.getNextIndex().get();
                 List<ReplicatedLogEntry> entries = Collections.emptyList();
 
-                if (context.getReplicatedLog().isPresent(nextIndex)) {
-                    // FIXME : Sending one entry at a time
-                    entries =
-                        context.getReplicatedLog().getFrom(nextIndex, 1);
+                if (mapFollowerToSnapshot.get(followerId) != null) {
+                    if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerId);
+                    }
+
+                } else {
+
+                    if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        // FIXME : Sending one entry at a time
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                        followerActor.tell(
+                            new AppendEntries(currentTerm(), context.getId(),
+                                prevLogIndex(followerNextIndex),
+                                prevLogTerm(followerNextIndex), entries,
+                                context.getCommitIndex()).toSerializable(),
+                            actor()
+                        );
+
+                    } else {
+                        // if the followers next index is not present in the leaders log, then snapshot should be sent
+                        long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                        long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                        if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
+                            // if the follower is just not starting and leader's index
+                            // is more than followers index
+                            context.getLogger().debug("SendInstallSnapshot to follower:{}," +
+                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+
+                            actor().tell(new SendInstallSnapshot(), actor());
+                        } else {
+                            followerActor.tell(
+                                new AppendEntries(currentTerm(), context.getId(),
+                                    prevLogIndex(followerNextIndex),
+                                    prevLogTerm(followerNextIndex), entries,
+                                    context.getCommitIndex()).toSerializable(),
+                                actor()
+                            );
+                        }
+                    }
                 }
-
-                followerActor.tell(
-                    new AppendEntries(currentTerm(), context.getId(),
-                        prevLogIndex(nextIndex),
-                        prevLogTerm(nextIndex), entries,
-                        context.getCommitIndex()).toSerializable(),
-                    actor()
-                );
             }
         }
     }
@@ -326,21 +387,55 @@ public class Leader extends AbstractRaftActorBehavior {
 
                 long nextIndex = followerLogInformation.getNextIndex().get();
 
-                if (!context.getReplicatedLog().isPresent(nextIndex) && context
-                    .getReplicatedLog().isInSnapshot(nextIndex)) {
-                    followerActor.tell(
-                        new InstallSnapshot(currentTerm(), context.getId(),
-                            context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().getSnapshotTerm(),
-                            context.getReplicatedLog().getSnapshot()
-                        ),
-                        actor()
-                    );
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    sendSnapshotChunk(followerActor, followerId);
                 }
             }
         }
     }
 
+    /**
+     *  Sends a snapshot chunk to a given follower
+     *  InstallSnapshot should qualify as a heartbeat too.
+     */
+    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+        try {
+            followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                    context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm(),
+                    getNextSnapshotChunk(followerId,
+                        context.getReplicatedLog().getSnapshot()),
+                    mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                ).toSerializable(),
+                actor()
+            );
+            context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                mapFollowerToSnapshot.get(followerId).getTotalChunks());
+        } catch (IOException e) {
+            context.getLogger().error("InstallSnapshot failed for Leader.", e);
+        }
+    }
+
+    /**
+     * Acccepts snaphot as ByteString, enters into map for future chunks
+     * creates and return a ByteString chunk
+     */
+    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        if (followerToSnapshot == null) {
+            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+        }
+        ByteString nextChunk = followerToSnapshot.getNextChunk();
+        context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+
+        return nextChunk;
+    }
+
     private RaftState sendHeartBeat() {
         if (followers.size() > 0) {
             sendAppendEntries();
@@ -410,4 +505,97 @@ public class Leader extends AbstractRaftActorBehavior {
         return context.getId();
     }
 
+    /**
+     * Encapsulates the snapshot bytestring and handles the logic of sending
+     * snapshot chunks
+     */
+    protected class FollowerToSnapshot {
+        private ByteString snapshotBytes;
+        private int offset = 0;
+        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+        private int replyReceivedForOffset;
+        // if replyStatus is false, the previous chunk is attempted
+        private boolean replyStatus = false;
+        private int chunkIndex;
+        private int totalChunks;
+
+        public FollowerToSnapshot(ByteString snapshotBytes) {
+            this.snapshotBytes = snapshotBytes;
+            replyReceivedForOffset = -1;
+            chunkIndex = 1;
+            int size = snapshotBytes.size();
+            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+            context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
+                size, totalChunks);
+        }
+
+        public ByteString getSnapshotBytes() {
+            return snapshotBytes;
+        }
+
+        public int incrementOffset() {
+            if(replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                offset = offset + context.getConfigParams().getSnapshotChunkSize();
+            }
+            return offset;
+        }
+
+        public int incrementChunkIndex() {
+            if (replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                chunkIndex =  chunkIndex + 1;
+            }
+            return chunkIndex;
+        }
+
+        public int getChunkIndex() {
+            return chunkIndex;
+        }
+
+        public int getTotalChunks() {
+            return totalChunks;
+        }
+
+        public boolean canSendNextChunk() {
+            // we only send a false if a chunk is sent but we have not received a reply yet
+            return replyReceivedForOffset == offset;
+        }
+
+        public boolean isLastChunk(int chunkIndex) {
+            return totalChunks == chunkIndex;
+        }
+
+        public void markSendStatus(boolean success) {
+            if (success) {
+                // if the chunk sent was successful
+                replyReceivedForOffset = offset;
+                replyStatus = true;
+            } else {
+                // if the chunk sent was failure
+                replyReceivedForOffset = offset;
+                replyStatus = false;
+            }
+        }
+
+        public ByteString getNextChunk() {
+            int snapshotLength = getSnapshotBytes().size();
+            int start = incrementOffset();
+            int size = context.getConfigParams().getSnapshotChunkSize();
+            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+                size = snapshotLength;
+            } else {
+                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+                    size = snapshotLength - start;
+                }
+            }
+
+            context.getLogger().debug("length={}, offset={},size={}",
+                snapshotLength, start, size);
+            return getSnapshotBytes().substring(start, start + size);
+
+        }
+    }
+
 }
index 888854fa71eaf4f745e11456c7b09349a7d8a443..9d40fa3d9edb3858969797e929776ddcba424333 100644 (file)
@@ -8,19 +8,29 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+
 public class InstallSnapshot extends AbstractRaftRPC {
 
+    public static final Class SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+
     private final String leaderId;
     private final long lastIncludedIndex;
     private final long lastIncludedTerm;
-    private final Object data;
+    private final ByteString data;
+    private final int chunkIndex;
+    private final int totalChunks;
 
-    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) {
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
         this.lastIncludedTerm = lastIncludedTerm;
         this.data = data;
+        this.chunkIndex = chunkIndex;
+        this.totalChunks = totalChunks;
     }
 
     public String getLeaderId() {
@@ -35,7 +45,38 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return lastIncludedTerm;
     }
 
-    public Object getData() {
+    public ByteString getData() {
         return data;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public int getTotalChunks() {
+        return totalChunks;
+    }
+
+    public <T extends Object> Object toSerializable(){
+        return InstallSnapshotMessages.InstallSnapshot.newBuilder()
+            .setLeaderId(this.getLeaderId())
+            .setChunkIndex(this.getChunkIndex())
+            .setData(this.getData())
+            .setLastIncludedIndex(this.getLastIncludedIndex())
+            .setLastIncludedTerm(this.getLastIncludedTerm())
+            .setTotalChunks(this.getTotalChunks()).build();
+
+    }
+
+    public static InstallSnapshot fromSerializable (Object o) {
+        InstallSnapshotMessages.InstallSnapshot from =
+            (InstallSnapshotMessages.InstallSnapshot) o;
+
+        InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+            from.getLeaderId(), from.getLastIncludedIndex(),
+            from.getLastIncludedTerm(), from.getData(),
+            from.getChunkIndex(), from.getTotalChunks());
+
+        return installSnapshot;
+    }
 }
index 85b89b70ae2b84c8ab26fcb7e16ba041fbd2040a..d293a47c8efdd1521afa4652f7a6a100e4c692ab 100644 (file)
@@ -13,13 +13,26 @@ public class InstallSnapshotReply extends AbstractRaftRPC {
     // The followerId - this will be used to figure out which follower is
     // responding
     private final String followerId;
+    private final int chunkIndex;
+    private boolean success;
 
-    protected InstallSnapshotReply(long term, String followerId) {
+    public InstallSnapshotReply(long term, String followerId, int chunkIndex,
+        boolean success) {
         super(term);
         this.followerId = followerId;
+        this.chunkIndex = chunkIndex;
+        this.success = success;
     }
 
     public String getFollowerId() {
         return followerId;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java
new file mode 100644 (file)
index 0000000..e801ae1
--- /dev/null
@@ -0,0 +1,1015 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: InstallSnapshot.proto
+
+package org.opendaylight.controller.cluster.raft.protobuff.messages;
+
+public final class InstallSnapshotMessages {
+  private InstallSnapshotMessages() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface InstallSnapshotOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 term = 1;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    boolean hasTerm();
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    long getTerm();
+
+    // optional string leaderId = 2;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    boolean hasLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    java.lang.String getLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getLeaderIdBytes();
+
+    // optional int64 lastIncludedIndex = 3;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    boolean hasLastIncludedIndex();
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    long getLastIncludedIndex();
+
+    // optional int64 lastIncludedTerm = 4;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    boolean hasLastIncludedTerm();
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    long getLastIncludedTerm();
+
+    // optional bytes data = 5;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    boolean hasData();
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    com.google.protobuf.ByteString getData();
+
+    // optional int32 chunkIndex = 6;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    boolean hasChunkIndex();
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    int getChunkIndex();
+
+    // optional int32 totalChunks = 7;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    boolean hasTotalChunks();
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    int getTotalChunks();
+  }
+  /**
+   * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+   */
+  public static final class InstallSnapshot extends
+      com.google.protobuf.GeneratedMessage
+      implements InstallSnapshotOrBuilder {
+    // Use InstallSnapshot.newBuilder() to construct.
+    private InstallSnapshot(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private InstallSnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final InstallSnapshot defaultInstance;
+    public static InstallSnapshot getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public InstallSnapshot getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private InstallSnapshot(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              term_ = input.readInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              leaderId_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              lastIncludedIndex_ = input.readInt64();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              lastIncludedTerm_ = input.readInt64();
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              data_ = input.readBytes();
+              break;
+            }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              chunkIndex_ = input.readInt32();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              totalChunks_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
+        new com.google.protobuf.AbstractParser<InstallSnapshot>() {
+      public InstallSnapshot parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new InstallSnapshot(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<InstallSnapshot> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int64 term = 1;
+    public static final int TERM_FIELD_NUMBER = 1;
+    private long term_;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public boolean hasTerm() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public long getTerm() {
+      return term_;
+    }
+
+    // optional string leaderId = 2;
+    public static final int LEADERID_FIELD_NUMBER = 2;
+    private java.lang.Object leaderId_;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public boolean hasLeaderId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public java.lang.String getLeaderId() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          leaderId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getLeaderIdBytes() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        leaderId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional int64 lastIncludedIndex = 3;
+    public static final int LASTINCLUDEDINDEX_FIELD_NUMBER = 3;
+    private long lastIncludedIndex_;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public boolean hasLastIncludedIndex() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public long getLastIncludedIndex() {
+      return lastIncludedIndex_;
+    }
+
+    // optional int64 lastIncludedTerm = 4;
+    public static final int LASTINCLUDEDTERM_FIELD_NUMBER = 4;
+    private long lastIncludedTerm_;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public boolean hasLastIncludedTerm() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public long getLastIncludedTerm() {
+      return lastIncludedTerm_;
+    }
+
+    // optional bytes data = 5;
+    public static final int DATA_FIELD_NUMBER = 5;
+    private com.google.protobuf.ByteString data_;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public boolean hasData() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public com.google.protobuf.ByteString getData() {
+      return data_;
+    }
+
+    // optional int32 chunkIndex = 6;
+    public static final int CHUNKINDEX_FIELD_NUMBER = 6;
+    private int chunkIndex_;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public boolean hasChunkIndex() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public int getChunkIndex() {
+      return chunkIndex_;
+    }
+
+    // optional int32 totalChunks = 7;
+    public static final int TOTALCHUNKS_FIELD_NUMBER = 7;
+    private int totalChunks_;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public boolean hasTotalChunks() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public int getTotalChunks() {
+      return totalChunks_;
+    }
+
+    private void initFields() {
+      term_ = 0L;
+      leaderId_ = "";
+      lastIncludedIndex_ = 0L;
+      lastIncludedTerm_ = 0L;
+      data_ = com.google.protobuf.ByteString.EMPTY;
+      chunkIndex_ = 0;
+      totalChunks_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt64(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt32(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeInt32(7, totalChunks_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, totalChunks_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+      }
+
+      // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        term_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        leaderId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        lastIncludedIndex_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedTerm_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        data_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        chunkIndex_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        totalChunks_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.term_ = term_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.leaderId_ = leaderId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.lastIncludedIndex_ = lastIncludedIndex_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.lastIncludedTerm_ = lastIncludedTerm_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.data_ = data_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.chunkIndex_ = chunkIndex_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.totalChunks_ = totalChunks_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
+          return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
+        if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+        if (other.hasTerm()) {
+          setTerm(other.getTerm());
+        }
+        if (other.hasLeaderId()) {
+          bitField0_ |= 0x00000002;
+          leaderId_ = other.leaderId_;
+          onChanged();
+        }
+        if (other.hasLastIncludedIndex()) {
+          setLastIncludedIndex(other.getLastIncludedIndex());
+        }
+        if (other.hasLastIncludedTerm()) {
+          setLastIncludedTerm(other.getLastIncludedTerm());
+        }
+        if (other.hasData()) {
+          setData(other.getData());
+        }
+        if (other.hasChunkIndex()) {
+          setChunkIndex(other.getChunkIndex());
+        }
+        if (other.hasTotalChunks()) {
+          setTotalChunks(other.getTotalChunks());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int64 term = 1;
+      private long term_ ;
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public boolean hasTerm() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public long getTerm() {
+        return term_;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder setTerm(long value) {
+        bitField0_ |= 0x00000001;
+        term_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder clearTerm() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        term_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional string leaderId = 2;
+      private java.lang.Object leaderId_ = "";
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public boolean hasLeaderId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public java.lang.String getLeaderId() {
+        java.lang.Object ref = leaderId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          leaderId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getLeaderIdBytes() {
+        java.lang.Object ref = leaderId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          leaderId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder clearLeaderId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        leaderId_ = getDefaultInstance().getLeaderId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedIndex = 3;
+      private long lastIncludedIndex_ ;
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public boolean hasLastIncludedIndex() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public long getLastIncludedIndex() {
+        return lastIncludedIndex_;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder setLastIncludedIndex(long value) {
+        bitField0_ |= 0x00000004;
+        lastIncludedIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder clearLastIncludedIndex() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedIndex_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedTerm = 4;
+      private long lastIncludedTerm_ ;
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public boolean hasLastIncludedTerm() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public long getLastIncludedTerm() {
+        return lastIncludedTerm_;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder setLastIncludedTerm(long value) {
+        bitField0_ |= 0x00000008;
+        lastIncludedTerm_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder clearLastIncludedTerm() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        lastIncludedTerm_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional bytes data = 5;
+      private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public boolean hasData() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public com.google.protobuf.ByteString getData() {
+        return data_;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder setData(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        data_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder clearData() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        data_ = getDefaultInstance().getData();
+        onChanged();
+        return this;
+      }
+
+      // optional int32 chunkIndex = 6;
+      private int chunkIndex_ ;
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public boolean hasChunkIndex() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public int getChunkIndex() {
+        return chunkIndex_;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder setChunkIndex(int value) {
+        bitField0_ |= 0x00000020;
+        chunkIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder clearChunkIndex() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        chunkIndex_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional int32 totalChunks = 7;
+      private int totalChunks_ ;
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public boolean hasTotalChunks() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public int getTotalChunks() {
+        return totalChunks_;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder setTotalChunks(int value) {
+        bitField0_ |= 0x00000040;
+        totalChunks_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder clearTotalChunks() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        totalChunks_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+    }
+
+    static {
+      defaultInstance = new InstallSnapshot(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+  }
+
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n\025InstallSnapshot.proto\022(org.opendayligh" +
+      "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+      "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
+      "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
+      "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
+      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
+      "light.controller.cluster.raft.protobuff." +
+      "messagesB\027InstallSnapshotMessagesH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
+              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto
new file mode 100644 (file)
index 0000000..14f821b
--- /dev/null
@@ -0,0 +1,15 @@
+package org.opendaylight.controller.cluster.raft;
+
+option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_outer_classname = "InstallSnapshotMessages";
+option optimize_for = SPEED;
+
+message InstallSnapshot {
+    optional int64 term = 1;
+    optional string leaderId = 2;
+    optional int64 lastIncludedIndex = 3;
+    optional int64 lastIncludedTerm = 4;
+    optional bytes data = 5;
+    optional int32 chunkIndex = 6;
+    optional int32 totalChunks = 7;
+}
index ea3c9e759d51744fb2dd36212bf8372e2f06f0c8..ca34a34ca49337783ec7fda8a9b9966fba1f16ac 100644 (file)
@@ -14,17 +14,14 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-import com.google.common.base.Preconditions;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class MockRaftActorContext implements RaftActorContext {
@@ -37,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private final ElectionTerm electionTerm;
     private ReplicatedLog replicatedLog;
     private Map<String, String> peerAddresses = new HashMap();
+    private ConfigParams configParams;
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -79,6 +77,8 @@ public class MockRaftActorContext implements RaftActorContext {
             }
         };
 
+        configParams = new DefaultConfigParamsImpl();
+
         initReplicatedLog();
     }
 
@@ -179,118 +179,21 @@ public class MockRaftActorContext implements RaftActorContext {
 
     @Override
     public ConfigParams getConfigParams() {
-        return new DefaultConfigParamsImpl();
+        return configParams;
     }
 
-    public static class SimpleReplicatedLog implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> log = new ArrayList<>();
-
-        @Override public ReplicatedLogEntry get(long index) {
-            if(index >= log.size() || index < 0){
-                return null;
-            }
-            return log.get((int) index);
-        }
-
-        @Override public ReplicatedLogEntry last() {
-            if(log.size() == 0){
-                return null;
-            }
-            return log.get(log.size()-1);
-        }
-
-        @Override public long lastIndex() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getIndex();
-        }
-
-        @Override public long lastTerm() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getTerm();
-        }
-
-        @Override public void removeFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return;
-            }
-
-            log.subList((int) index, log.size()).clear();
-            //log.remove((int) index);
-        }
-
-        @Override public void removeFromAndPersist(long index) {
-            removeFrom(index);
-        }
-
-        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
-            log.add(replicatedLogEntry);
-        }
+    public void setConfigParams(ConfigParams configParams) {
+        this.configParams = configParams;
+    }
 
+    public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
             append(replicatedLogEntry);
         }
 
-        @Override public List<ReplicatedLogEntry> getFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            for(int i=(int) index ; i < log.size() ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-        }
-
-        @Override public List<ReplicatedLogEntry> getFrom(long index, int max) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            int maxIndex = (int) index + max;
-            if(maxIndex > log.size()){
-                maxIndex = log.size();
-            }
-
-            for(int i=(int) index ; i < maxIndex ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-
-        }
-
-        @Override public long size() {
-            return log.size();
-        }
-
-        @Override public boolean isPresent(long index) {
-            if(index >= log.size() || index < 0){
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override public boolean isInSnapshot(long index) {
-            return false;
-        }
-
-        @Override public Object getSnapshot() {
-            return null;
-        }
-
-        @Override public long getSnapshotIndex() {
-            return -1;
-        }
-
-        @Override public long getSnapshotTerm() {
-            return -1;
+        @Override public void removeFromAndPersist(long index) {
+            removeFrom(index);
         }
     }
 
index ff0ffeb271b55b38455901d03cb31871620b078c..12123db12995061901a39a264c79f0237d78d00a 100644 (file)
@@ -6,6 +6,7 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import com.google.protobuf.ByteString;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
@@ -39,11 +40,11 @@ public class RaftActorTest extends AbstractActorTest {
             Object data) {
         }
 
-        @Override protected Object createSnapshot() {
+        @Override protected void createSnapshot() {
             throw new UnsupportedOperationException("createSnapshot");
         }
 
-        @Override protected void applySnapshot(Object snapshot) {
+        @Override protected void applySnapshot(ByteString snapshot) {
             throw new UnsupportedOperationException("applySnapshot");
         }
 
index c5a81aa1c9225ea03fa548bf1950d5e73a7e3329..227d1effa7e9b8b3bb65932fe8c25b1a2eecdbf5 100644 (file)
@@ -158,17 +158,18 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 createActorContext();
 
             context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 0, 0, new MockRaftActorContext.MockPayload(""));
+            setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload(""));
+            ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
 
             List<ReplicatedLogEntry> entries =
                 Arrays.asList(
-                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
+                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
                         new MockRaftActorContext.MockPayload("foo"))
                 );
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
 
             RaftState raftState =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
index 17c22a134a9a7f26e08998930b2609b128f40c21..73c9f96b82a0a582f4cf5e61b5d68c488f9bc198 100644 (file)
@@ -1,24 +1,40 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.google.protobuf.ByteString;
+import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
@@ -82,8 +98,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
-
-
             };
         }};
     }
@@ -194,18 +208,372 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
+            };
+        }};
+    }
+
+    @Test
+    public void testSendInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext(getRef());
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+                    // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+                    RaftState raftState = leader.handleMessage(
+                        senderActor, new Replicate(null, "state-id", entry));
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
+                    Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+                        @Override
+                        protected Boolean match(Object o) throws Exception {
+                            if (o instanceof SendInstallSnapshot) {
+                                return true;
+                            }
+                            return false;
+                        }
+                    }.get();
+
+                    boolean sendInstallSnapshotReceived = false;
+                    for (Boolean b: matches) {
+                        sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
+                    }
+
+                    assertTrue(sendInstallSnapshotReceived);
+
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+
+                    RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // check if installsnapshot gets called with the correct values.
+                    final String out =
+                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected String match(Object in) {
+                                if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+                                    InstallSnapshot is = (InstallSnapshot)
+                                        SerializationUtils.fromSerializable(in);
+                                    if (is.getData() == null) {
+                                        return "InstallSnapshot data is null";
+                                    }
+                                    if (is.getLastIncludedIndex() != snapshotIndex) {
+                                        return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+                                    }
+                                    if (is.getLastIncludedTerm() != snapshotTerm) {
+                                        return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+                                    }
+                                    if (is.getTerm() == currentTerm) {
+                                        return is.getTerm() + "!=" + currentTerm;
+                                    }
+
+                                    return "match";
+
+                               } else {
+                                    return "message mismatch:" + in.getClass();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleInstallSnapshotReplyLastChunk() {
+        new LeaderTestKit(getSystem()) {{
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    ByteString bs = toByteString(leadersSnapshot);
+                    leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+                    while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+                        leader.getFollowerToSnapshot().getNextChunk();
+                        leader.getFollowerToSnapshot().incrementChunkIndex();
+                    }
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
 
+                    RaftState raftState = leader.handleMessage(senderActor,
+                        new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+                            leader.getFollowerToSnapshot().getChunkIndex(), true));
 
+                    assertEquals(RaftState.Leader, raftState);
+
+                    assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+                    assertEquals(leader.followerToLog.size(), 1);
+                    assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+                    FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+                }
             };
         }};
     }
 
+    @Test
+    public void testFollowerToSnapshotLogic() {
+
+        MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
+
+        MockLeader leader = new MockLeader(actorContext);
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        ByteString bs = toByteString(leadersSnapshot);
+        byte[] barray = bs.toByteArray();
+
+        leader.createFollowerToSnapshot("followerId", bs);
+        assertEquals(bs.size(), barray.length);
+
+        int chunkIndex=0;
+        for (int i=0; i < barray.length; i = i + 50) {
+            int j = i + 50;
+            chunkIndex++;
+
+            if (i + 50 > barray.length) {
+                j = barray.length;
+            }
+
+            ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+            assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+
+            leader.getFollowerToSnapshot().markSendStatus(true);
+            if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
+                leader.getFollowerToSnapshot().incrementChunkIndex();
+            }
+        }
+
+        assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+    }
+
+
     @Override protected RaftActorBehavior createBehavior(
         RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
     @Override protected RaftActorContext createActorContext() {
-        return new MockRaftActorContext("test", getSystem(), leaderActor);
+        return createActorContext(leaderActor);
+    }
+
+    protected RaftActorContext createActorContext(ActorRef actorRef) {
+        return new MockRaftActorContext("test", getSystem(), actorRef);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
+
+    private static class LeaderTestKit extends JavaTestKit {
+
+        private LeaderTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+            new JavaTestKit.EventFilter<Boolean>(logLevel
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(subject.path().toString())
+                .message(logMessage)
+                .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+    }
+
+    class MockLeader extends Leader {
+
+        FollowerToSnapshot fts;
+
+        public MockLeader(RaftActorContext context){
+            super(context);
+        }
+
+        public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
+            FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(nextIndex),
+                    new AtomicLong(matchIndex));
+            followerToLog.put(followerId, followerLogInformation);
+        }
+
+        public FollowerToSnapshot getFollowerToSnapshot() {
+            return fts;
+        }
+
+        public void createFollowerToSnapshot(String followerId, ByteString bs ) {
+            fts = new FollowerToSnapshot(bs);
+            mapFollowerToSnapshot.put(followerId, fts);
+
+        }
     }
 }
index 43a9faa3e44e5d6fe77db7cce74929f8888dcadb..d8af74c84c7a0acd9fdaa106873196b49fd4850e 100644 (file)
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -365,7 +365,6 @@ public class Shard extends RaftActor {
                     identifier, clientActor.path().toString());
             }
 
-
         } else {
             LOG.error("Unknown state received {}", data);
         }
@@ -383,11 +382,11 @@ public class Shard extends RaftActor {
 
     }
 
-    @Override protected Object createSnapshot() {
+    @Override protected void createSnapshot() {
         throw new UnsupportedOperationException("createSnapshot");
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         throw new UnsupportedOperationException("applySnapshot");
     }