Install snapshot and Reply 65/9765/11
authorKamal Rameshan <kramesha@cisco.com>
Wed, 6 Aug 2014 23:25:41 +0000 (16:25 -0700)
committerMoiz Raja <moraja@cisco.com>
Thu, 4 Sep 2014 15:05:41 +0000 (15:05 +0000)
1. InstallSnaphot change in Leader
2. Moved Snapshot out of RaftActor
3. Follower ApplySnapshot changes
4. InstallSnapshotReply changes in Leader
5. Installing Snapshot made in chunks
6. Implemented ProtocolBuffer for InstallSnapshots
7. Sanpshots as ByteStrings
8. Create snapshots made aysnc using CaptureSnapshot messages
9. Save Snapshot Failure Rollback
10. LeaderTest for send snapshot and snaphot reply

Change-Id: I16acc20ac7f671e89c5fbdb7aebd6dbe72e5f6e0
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
25 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index cbd7ca2d70f5dc090a1e842b75c200cb0c1976b9..c4ff108611d9fbdb177f2ef4ace98bb030d69991 100644 (file)
@@ -12,14 +12,21 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -82,14 +89,63 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    @Override protected Object createSnapshot() {
-        return state;
+    @Override protected void createSnapshot() {
+        ByteString bs = null;
+        try {
+            bs = fromObject(state);
+        } catch (Exception e) {
+            LOG.error("Exception in creating snapshot", e);
+        }
+        getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         state.clear();
-        state.putAll((HashMap) snapshot);
-        LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
+        try {
+            state.putAll((HashMap) toObject(snapshot));
+        } catch (Exception e) {
+           LOG.error("Exception in applying snapshot", e);
+        }
+        LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+    }
+
+    private ByteString fromObject(Object snapshot) throws Exception {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            b = new ByteArrayOutputStream();
+            o = new ObjectOutputStream(b);
+            o.writeObject(snapshot);
+            byte[] snapshotBytes = b.toByteArray();
+            return ByteString.copyFrom(snapshotBytes);
+        } finally {
+            if (o != null) {
+                o.flush();
+                o.close();
+            }
+            if (b != null) {
+                b.close();
+            }
+        }
+    }
+
+    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+        Object obj = null;
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try {
+            bis = new ByteArrayInputStream(bs.toByteArray());
+            ois = new ObjectInputStream(bis);
+            obj = ois.readObject();
+        } finally {
+            if (bis != null) {
+                bis.close();
+            }
+            if (ois != null) {
+                ois.close();
+            }
+        }
+        return obj;
     }
 
     @Override protected void onStateChanged() {
index d11377dbcb359e58fdbbf3494a20ca6ded6565c4..6192cad2307b99896326619233bf9903f7aeb41a 100644 (file)
@@ -17,4 +17,9 @@ public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
     public long getSnapshotBatchCount() {
         return 50;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return 50;
+    }
 }
index fd6e192bf0497777de2643a1b3f28a2a76b72e42..978ea91089dbcb1a530c9d66f6878ffa06579e2d 100644 (file)
@@ -109,6 +109,8 @@ public class TestDriver {
                 td.printState();
             } else if (command.startsWith("printNodes")) {
                 td.printNodes();
+            } else {
+                System.out.println("Invalid command:" + command);
             }
 
         }
index b5b034afb9cf8edc7635cfca5509c93cbeb457b5..b436bce50061f98d0362ce3df4336c4279977d63 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -16,12 +18,18 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected final List<ReplicatedLogEntry> journal;
-    protected final Object snapshot;
+    protected List<ReplicatedLogEntry> journal;
+    protected ByteString snapshot;
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
-    public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+    // to be used for rollback during save snapshot failure
+    protected List<ReplicatedLogEntry> snapshottedJournal;
+    protected ByteString previousSnapshot;
+    protected long previousSnapshotIndex = -1;
+    protected long previousSnapshotTerm = -1;
+
+    public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
         this.snapshot = state;
         this.snapshotIndex = snapshotIndex;
@@ -137,11 +145,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex;
+        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
-    public Object getSnapshot() {
+    public ByteString getSnapshot() {
         return snapshot;
     }
 
@@ -160,4 +168,68 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public abstract void removeFromAndPersist(long index);
+
+    @Override
+    public void setSnapshotIndex(long snapshotIndex) {
+        this.snapshotIndex = snapshotIndex;
+    }
+
+    @Override
+    public void setSnapshotTerm(long snapshotTerm) {
+        this.snapshotTerm = snapshotTerm;
+    }
+
+    @Override
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override
+    public void clear(int startIndex, int endIndex) {
+        journal.subList(startIndex, endIndex).clear();
+    }
+
+    @Override
+    public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+        snapshottedJournal = new ArrayList<>(journal.size());
+
+        snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+        previousSnapshotIndex = snapshotIndex;
+        setSnapshotIndex(snapshotCapturedIndex);
+
+        previousSnapshotTerm = snapshotTerm;
+        setSnapshotTerm(snapshotCapturedTerm);
+
+        previousSnapshot = getSnapshot();
+        setSnapshot(snapshot);
+    }
+
+    @Override
+    public void snapshotCommit() {
+        snapshottedJournal.clear();
+        snapshottedJournal = null;
+        previousSnapshotIndex = -1;
+        previousSnapshotTerm = -1;
+        previousSnapshot = null;
+    }
+
+    @Override
+    public void snapshotRollback() {
+        snapshottedJournal.addAll(journal);
+        journal.clear();
+        journal = snapshottedJournal;
+        snapshottedJournal = null;
+
+        snapshotIndex = previousSnapshotIndex;
+        previousSnapshotIndex = -1;
+
+        snapshotTerm = previousSnapshotTerm;
+        previousSnapshotTerm = -1;
+
+        snapshot = previousSnapshot;
+        previousSnapshot = null;
+
+    }
 }
index 4c6434aec457db0b4899973f36effd0b37e7d14a..ed6439d8c33bceb545927fd6c2f892665b160f8a 100644 (file)
@@ -52,4 +52,9 @@ public interface ConfigParams {
      * @return int
      */
     public int getElectionTimeVariance();
+
+    /**
+     * The size (in bytes) of the snapshot chunk sent from Leader
+     */
+    public int getSnapshotChunkSize();
 }
index 6432fa4811beb64ef13f6869e6e252289c2163be..75c237f5035e57abd61c839835ec3c78548a6157 100644 (file)
@@ -25,6 +25,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
+    private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+
 
     /**
      * The interval at which a heart beat message will be sent to the remote
@@ -58,4 +60,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getElectionTimeVariance() {
         return ELECTION_TIME_MAX_VARIANCE;
     }
+
+    @Override
+    public int getSnapshotChunkSize() {
+        return SNAPSHOT_CHUNK_SIZE;
+    }
 }
index 988789b4011e4f3ba9e9e9abe6b0c3009704c810..296ce2d24aaa24b3920d789697ddb784ccec7bea 100644 (file)
@@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -98,6 +103,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
+    private CaptureSnapshot captureSnapshot = null;
+
+    private volatile boolean hasSnapshotCaptureInitiated = false;
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
@@ -125,6 +133,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             replicatedLog = new ReplicatedLogImpl(snapshot);
 
             context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
             LOG.debug("Applied snapshot to replicatedLog. " +
                 "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -132,7 +141,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 replicatedLog.size());
 
             // Apply the snapshot to the actors state
-            applySnapshot(snapshot.getState());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
@@ -164,7 +173,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 applyState.getReplicatedLogEntry().getData());
 
         } else if(message instanceof ApplySnapshot ) {
-            applySnapshot(((ApplySnapshot) message).getSnapshot());
+            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
+
+            LOG.debug("ApplySnapshot called on Follower Actor " +
+                "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            //clears the followers log, sets the snapshot index to ensure adjusted-index works
+            replicatedLog = new ReplicatedLogImpl(snapshot);
+            context.setReplicatedLog(replicatedLog);
+            context.setLastApplied(snapshot.getLastAppliedIndex());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
@@ -174,13 +193,26 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+            LOG.info("SaveSnapshotSuccess received for snapshot");
+
+            context.getReplicatedLog().snapshotCommit();
 
             // TODO: Not sure if we want to be this aggressive with trimming stuff
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
+
+            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
+            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
 
-            // TODO: Handle failure in saving the snapshot
+            context.getReplicatedLog().snapshotRollback();
+
+            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+                context.getReplicatedLog().getSnapshotIndex(),
+                context.getReplicatedLog().getSnapshotTerm(),
+                context.getReplicatedLog().size());
 
         } else if (message instanceof AddRaftPeer){
 
@@ -196,7 +228,25 @@ public abstract class RaftActor extends UntypedPersistentActor {
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
 
+        } else if (message instanceof CaptureSnapshot) {
+            LOG.debug("CaptureSnapshot received by actor");
+            CaptureSnapshot cs = (CaptureSnapshot)message;
+            captureSnapshot = cs;
+            createSnapshot();
+
+        } else if (message instanceof CaptureSnapshotReply){
+            LOG.debug("CaptureSnapshotReply received by actor");
+            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+
+            ByteString stateInBytes = csr.getSnapshot();
+            LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+            handleCaptureSnapshotReply(stateInBytes);
+
         } else {
+            if (!(message instanceof AppendEntriesMessages.AppendEntries)
+                && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
+                LOG.debug("onReceiveCommand: message:" + message.getClass());
+            }
 
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
@@ -344,7 +394,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @return The current state of the actor
      */
-    protected abstract Object createSnapshot();
+    protected abstract void createSnapshot();
 
     /**
      * This method will be called by the RaftActor during recovery to
@@ -356,7 +406,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(Object snapshot);
+    protected abstract void applySnapshot(ByteString snapshot);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -423,11 +473,39 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return peerAddress;
     }
 
+    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+        // create a snapshot object from the state provided and save it
+        // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+        saveSnapshot(sn);
+
+        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+        //be greedy and remove entries from in-mem journal which are in the snapshot
+        // and update snapshotIndex and snapshotTerm without waiting for the success,
+
+        context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+            captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+            captureSnapshot.getLastAppliedTerm());
+
+        captureSnapshot = null;
+        hasSnapshotCaptureInitiated = false;
+    }
+
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getState(),
+            super(ByteString.copyFrom(snapshot.getState()),
                 snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
         }
@@ -476,8 +554,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
-                        // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
+                        // when a snaphsot is being taken, captureSnapshot != null
+                        if (hasSnapshotCaptureInitiated == false &&
+                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
@@ -493,26 +573,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
                             LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
                             LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
 
-                            // create a snapshot object from the state provided and save it
-                            // when snapshot is saved async, SaveSnapshotSuccess is raised.
-                            Snapshot sn = Snapshot.create(createSnapshot(),
-                                getFrom(context.getLastApplied() + 1),
-                                lastIndex(), lastTerm(), lastAppliedIndex,
-                                lastAppliedTerm);
-                            saveSnapshot(sn);
-
-                            LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
-
-                            //be greedy and remove entries from in-mem journal which are in the snapshot
-                            // and update snapshotIndex and snapshotTerm without waiting for the success,
-                            // TODO: damage-recovery to be done on failure
-                            journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
-                            snapshotIndex = lastAppliedIndex;
-                            snapshotTerm = lastAppliedTerm;
-
-                            LOG.info("Removed in-memory snapshotted entries, " +
-                                "adjusted snaphsotIndex:{}" +
-                                "and term:{}", snapshotIndex, lastAppliedTerm);
+                            // send a CaptureSnapshot to self to make the expensive operation async.
+                            getSelf().tell(new CaptureSnapshot(
+                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                                null);
+                            hasSnapshotCaptureInitiated = true;
                         }
                         // Send message for replication
                         if (clientActor != null) {
@@ -542,65 +607,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
-    private static class Snapshot implements Serializable {
-        private final Object state;
-        private final List<ReplicatedLogEntry> unAppliedEntries;
-        private final long lastIndex;
-        private final long lastTerm;
-        private final long lastAppliedIndex;
-        private final long lastAppliedTerm;
-
-        private Snapshot(Object state,
-            List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
-            long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
-            this.state = state;
-            this.unAppliedEntries = unAppliedEntries;
-            this.lastIndex = lastIndex;
-            this.lastTerm = lastTerm;
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-        }
-
-
-        public static Snapshot create(Object state,
-            List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
-            long lastAppliedIndex, long lastAppliedTerm) {
-            return new Snapshot(state, entries, lastIndex, lastTerm,
-                lastAppliedIndex, lastAppliedTerm);
-        }
-
-        public Object getState() {
-            return state;
-        }
-
-        public List<ReplicatedLogEntry> getUnAppliedEntries() {
-            return unAppliedEntries;
-        }
-
-        public long getLastTerm() {
-            return lastTerm;
-        }
-
-        public long getLastAppliedIndex() {
-            return lastAppliedIndex;
-        }
-
-        public long getLastAppliedTerm() {
-            return lastAppliedTerm;
-        }
-
-        public String getLogMessage() {
-            StringBuilder sb = new StringBuilder();
-            return sb.append("Snapshot={")
-                .append("lastTerm:" + this.getLastTerm()  + ", ")
-                .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
-                .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
-                .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
-                .toString();
-
-        }
-    }
-
     private class ElectionTermImpl implements ElectionTerm {
         /**
          * Identifier of the actor whose election term information this is
index e6e160bc02bf1fd72305325aefc91a1ac2a9fac0..c17f5448c6e256a97c4f7134959bb6c2d88a0971 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.protobuf.ByteString;
+
 import java.util.List;
 
 /**
@@ -118,7 +120,7 @@ public interface ReplicatedLog {
      *
      * @return an object representing the snapshot if it exists. null otherwise
      */
-    Object getSnapshot();
+    ByteString getSnapshot();
 
     /**
      * Get the index of the snapshot
@@ -134,4 +136,49 @@ public interface ReplicatedLog {
      * otherwise
      */
     long getSnapshotTerm();
+
+    /**
+     * sets the snapshot index in the replicated log
+     * @param snapshotIndex
+     */
+    void setSnapshotIndex(long snapshotIndex);
+
+    /**
+     * sets snapshot term
+     * @param snapshotTerm
+     */
+    public void setSnapshotTerm(long snapshotTerm);
+
+    /**
+     * sets the snapshot in bytes
+     * @param snapshot
+     */
+    public void setSnapshot(ByteString snapshot);
+
+    /**
+     * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
+     * @param startIndex
+     * @param endIndex
+     */
+    public void clear(int startIndex, int endIndex);
+
+    /**
+     * Handles all the bookkeeping in order to perform a rollback in the
+     * event of SaveSnapshotFailure
+     * @param snapshot
+     * @param snapshotCapturedIndex
+     * @param snapshotCapturedTerm
+     */
+    public void snapshotPreCommit(ByteString snapshot,
+        long snapshotCapturedIndex, long snapshotCapturedTerm);
+
+    /**
+     * Sets the Replicated log to state after snapshot success.
+     */
+    public void snapshotCommit();
+
+    /**
+     * Restores the replicated log to a state in the event of a save snapshot failure
+     */
+    public void snapshotRollback();
 }
index 374e0fa9ba766cfe0417347fc4d6a4602d21fcdb..2f5ba48f9258d9a47ebf62ac98966db5d1055d99 100644 (file)
@@ -9,12 +9,16 @@
 package org.opendaylight.controller.cluster.raft;
 
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 
 public class SerializationUtils {
 
     public static Object fromSerializable(Object serializable){
         if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){
             return AppendEntries.fromSerializable(serializable);
+
+        } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+            return InstallSnapshot.fromSerializable(serializable);
         }
         return serializable;
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
new file mode 100644 (file)
index 0000000..8e0fcca
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+public class Snapshot implements Serializable {
+    private final byte[] state;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
+    private final long lastIndex;
+    private final long lastTerm;
+    private final long lastAppliedIndex;
+    private final long lastAppliedTerm;
+
+    private Snapshot(byte[] state,
+        List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+        long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+        this.state = state;
+        this.unAppliedEntries = unAppliedEntries;
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+
+    public static Snapshot create(byte[] state,
+        List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        return new Snapshot(state, entries, lastIndex, lastTerm,
+            lastAppliedIndex, lastAppliedTerm);
+    }
+
+    public byte[] getState() {
+        return state;
+    }
+
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return this.lastIndex;
+    }
+
+    public String getLogMessage() {
+        StringBuilder sb = new StringBuilder();
+        return sb.append("Snapshot={")
+            .append("lastTerm:" + this.getLastTerm() + ", ")
+            .append("lastIndex:" + this.getLastIndex()  + ", ")
+            .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
+            .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
+            .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
+            .toString();
+
+    }
+}
index 9739fb2f1b0ccb42e5fcd0446e5769194797be31..c356804223c696e83f10c73e17da35d63d419c85 100644 (file)
@@ -8,16 +8,21 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import org.opendaylight.controller.cluster.raft.Snapshot;
+
 import java.io.Serializable;
 
+/**
+ * Internal message, issued by follower to its actor
+ */
 public class ApplySnapshot implements Serializable {
-    private final Object snapshot;
+    private final Snapshot snapshot;
 
-    public ApplySnapshot(Object snapshot) {
+    public ApplySnapshot(Snapshot snapshot) {
         this.snapshot = snapshot;
     }
 
-    public Object getSnapshot() {
+    public Snapshot getSnapshot() {
         return snapshot;
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
new file mode 100644 (file)
index 0000000..bb86e1a
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+public class CaptureSnapshot {
+    private long lastAppliedIndex;
+    private long lastAppliedTerm;
+    private long lastIndex;
+    private long lastTerm;
+
+    public CaptureSnapshot(long lastIndex, long lastTerm,
+        long lastAppliedIndex, long lastAppliedTerm) {
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return lastIndex;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java
new file mode 100644 (file)
index 0000000..96150db
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import com.google.protobuf.ByteString;
+
+public class CaptureSnapshotReply {
+    private ByteString snapshot;
+
+    public CaptureSnapshotReply(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    public ByteString getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+}
index 251a13d583ec444ac4ca0c1cc028831feeb48958..7e896fed29c4889f6aec5ce39436a1970a50e03b 100644 (file)
@@ -305,6 +305,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param index a log index that is known to be committed
      */
     protected void applyLogToStateMachine(final long index) {
+        long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1;
              i < index + 1; i++) {
@@ -322,15 +323,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             if (replicatedLogEntry != null) {
                 actor().tell(new ApplyState(clientActor, identifier,
                     replicatedLogEntry), actor());
+                newLastApplied = i;
             } else {
+                //if one index is not present in the log, no point in looping
+                // around as the rest wont be present either
                 context.getLogger().error(
-                    "Missing index " + i + " from log. Cannot apply state.");
+                    "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+                break;
             }
         }
         // Send a local message to the local RaftActor (it's derived class to be
         // specific to apply the log to it's index)
-        context.getLogger().debug("Setting last applied to {}", index);
-        context.setLastApplied(index);
+        context.getLogger().debug("Setting last applied to {}", newLastApplied);
+        context.setLastApplied(newLastApplied);
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 54e0494b9da65305729afcf00686e3102c31dd00..610fdc987fde7a1a51491ef25dc2f764011b9eda 100644 (file)
@@ -9,17 +9,22 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
+import java.util.ArrayList;
+
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
+    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
     public Follower(RaftActorContext context) {
         super(context);
 
@@ -106,6 +113,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
+            context.getLogger().debug("Follower is out-of-sync, " +
+                "so sending negative reply, lastIndex():{}, lastTerm():{}",
+                lastIndex(), lastTerm());
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -191,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior {
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (§5.3)
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+        // check if there are any entries to be applied. last-applied can be equal to last-index
+        if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+            context.getLastApplied() < lastIndex()) {
+            context.getLogger().debug("applyLogToStateMachine, " +
+                "appendEntries.getLeaderCommit():{}," +
+                "context.getLastApplied():{}, lastIndex():{}",
+                appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
@@ -234,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+            handleInstallSnapshot(sender, installSnapshot);
         }
 
         scheduleElection(electionDuration());
@@ -242,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+        context.getLogger().debug("InstallSnapshot received by follower " +
+            "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+        try {
+            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+                // this is the last chunk, create a snapshot object and apply
+
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+                    snapshotChunksCollected.size());
+
+                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+                    new ArrayList<ReplicatedLogEntry>(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm(),
+                    installSnapshot.getLastIncludedIndex(),
+                    installSnapshot.getLastIncludedTerm());
+
+                actor().tell(new ApplySnapshot(snapshot), actor());
+
+            } else {
+                // we have more to go
+                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+                    installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+            }
+
+            sender.tell(new InstallSnapshotReply(
+                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                true), actor());
+
+        } catch (Exception e) {
+            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            //send reply with success as false. The chunk will be sent again on failure
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                installSnapshot.getChunkIndex(), false), actor());
+        }
+    }
+
     @Override public void close() throws Exception {
         stopElection();
     }
index 234f9db664e4d43e833e4e354e3d3045094dd381..90948ffef7d8a5e1341bb8aede6b03ccf8dae344 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,8 +66,9 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Leader extends AbstractRaftActorBehavior {
 
 
-    private final Map<String, FollowerLogInformation> followerToLog =
+    protected final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
+    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private final Set<String> followers;
 
@@ -246,16 +249,48 @@ public class Leader extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
-    private void handleInstallSnapshotReply(InstallSnapshotReply message) {
-        InstallSnapshotReply reply = message;
+    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
+        FollowerToSnapshot followerToSnapshot =
+            mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot != null &&
+            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+            if (reply.isSuccess()) {
+                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+                    //this was the last chunk reply
+                    context.getLogger().debug("InstallSnapshotReply received, " +
+                        "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+                        reply.getChunkIndex(), followerId,
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+
+                    FollowerLogInformation followerLogInformation =
+                        followerToLog.get(followerId);
+                    followerLogInformation.setMatchIndex(
+                        context.getReplicatedLog().getSnapshotIndex());
+                    followerLogInformation.setNextIndex(
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    mapFollowerToSnapshot.remove(followerId);
+                    context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
+                        followerToLog.get(followerId).getNextIndex().get());
+
+                } else {
+                    followerToSnapshot.markSendStatus(true);
+                }
+            } else {
+                context.getLogger().info("InstallSnapshotReply received, " +
+                    "sending snapshot chunk failed, Will retry, Chunk:{}",
+                    reply.getChunkIndex());
+                followerToSnapshot.markSendStatus(false);
+            }
 
-        followerLogInformation
-            .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
-        followerLogInformation
-            .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+        } else {
+            context.getLogger().error("ERROR!!" +
+                "FollowerId in InstallSnapshotReply not known to Leader" +
+                " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+                followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+        }
     }
 
     private void replicate(Replicate replicate) {
@@ -282,30 +317,56 @@ public class Leader extends AbstractRaftActorBehavior {
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
         for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long followerNextIndex = followerLogInformation.getNextIndex().get();
                 List<ReplicatedLogEntry> entries = Collections.emptyList();
 
-                if (context.getReplicatedLog().isPresent(nextIndex)) {
-                    // FIXME : Sending one entry at a time
-                    entries =
-                        context.getReplicatedLog().getFrom(nextIndex, 1);
+                if (mapFollowerToSnapshot.get(followerId) != null) {
+                    if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerId);
+                    }
+
+                } else {
+
+                    if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        // FIXME : Sending one entry at a time
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                        followerActor.tell(
+                            new AppendEntries(currentTerm(), context.getId(),
+                                prevLogIndex(followerNextIndex),
+                                prevLogTerm(followerNextIndex), entries,
+                                context.getCommitIndex()).toSerializable(),
+                            actor()
+                        );
+
+                    } else {
+                        // if the followers next index is not present in the leaders log, then snapshot should be sent
+                        long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                        long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                        if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
+                            // if the follower is just not starting and leader's index
+                            // is more than followers index
+                            context.getLogger().debug("SendInstallSnapshot to follower:{}," +
+                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+
+                            actor().tell(new SendInstallSnapshot(), actor());
+                        } else {
+                            followerActor.tell(
+                                new AppendEntries(currentTerm(), context.getId(),
+                                    prevLogIndex(followerNextIndex),
+                                    prevLogTerm(followerNextIndex), entries,
+                                    context.getCommitIndex()).toSerializable(),
+                                actor()
+                            );
+                        }
+                    }
                 }
-
-                followerActor.tell(
-                    new AppendEntries(currentTerm(), context.getId(),
-                        prevLogIndex(nextIndex),
-                        prevLogTerm(nextIndex), entries,
-                        context.getCommitIndex()).toSerializable(),
-                    actor()
-                );
             }
         }
     }
@@ -326,21 +387,55 @@ public class Leader extends AbstractRaftActorBehavior {
 
                 long nextIndex = followerLogInformation.getNextIndex().get();
 
-                if (!context.getReplicatedLog().isPresent(nextIndex) && context
-                    .getReplicatedLog().isInSnapshot(nextIndex)) {
-                    followerActor.tell(
-                        new InstallSnapshot(currentTerm(), context.getId(),
-                            context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().getSnapshotTerm(),
-                            context.getReplicatedLog().getSnapshot()
-                        ),
-                        actor()
-                    );
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    sendSnapshotChunk(followerActor, followerId);
                 }
             }
         }
     }
 
+    /**
+     *  Sends a snapshot chunk to a given follower
+     *  InstallSnapshot should qualify as a heartbeat too.
+     */
+    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+        try {
+            followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                    context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm(),
+                    getNextSnapshotChunk(followerId,
+                        context.getReplicatedLog().getSnapshot()),
+                    mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                ).toSerializable(),
+                actor()
+            );
+            context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                mapFollowerToSnapshot.get(followerId).getTotalChunks());
+        } catch (IOException e) {
+            context.getLogger().error("InstallSnapshot failed for Leader.", e);
+        }
+    }
+
+    /**
+     * Acccepts snaphot as ByteString, enters into map for future chunks
+     * creates and return a ByteString chunk
+     */
+    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        if (followerToSnapshot == null) {
+            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+        }
+        ByteString nextChunk = followerToSnapshot.getNextChunk();
+        context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+
+        return nextChunk;
+    }
+
     private RaftState sendHeartBeat() {
         if (followers.size() > 0) {
             sendAppendEntries();
@@ -410,4 +505,97 @@ public class Leader extends AbstractRaftActorBehavior {
         return context.getId();
     }
 
+    /**
+     * Encapsulates the snapshot bytestring and handles the logic of sending
+     * snapshot chunks
+     */
+    protected class FollowerToSnapshot {
+        private ByteString snapshotBytes;
+        private int offset = 0;
+        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+        private int replyReceivedForOffset;
+        // if replyStatus is false, the previous chunk is attempted
+        private boolean replyStatus = false;
+        private int chunkIndex;
+        private int totalChunks;
+
+        public FollowerToSnapshot(ByteString snapshotBytes) {
+            this.snapshotBytes = snapshotBytes;
+            replyReceivedForOffset = -1;
+            chunkIndex = 1;
+            int size = snapshotBytes.size();
+            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+            context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
+                size, totalChunks);
+        }
+
+        public ByteString getSnapshotBytes() {
+            return snapshotBytes;
+        }
+
+        public int incrementOffset() {
+            if(replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                offset = offset + context.getConfigParams().getSnapshotChunkSize();
+            }
+            return offset;
+        }
+
+        public int incrementChunkIndex() {
+            if (replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                chunkIndex =  chunkIndex + 1;
+            }
+            return chunkIndex;
+        }
+
+        public int getChunkIndex() {
+            return chunkIndex;
+        }
+
+        public int getTotalChunks() {
+            return totalChunks;
+        }
+
+        public boolean canSendNextChunk() {
+            // we only send a false if a chunk is sent but we have not received a reply yet
+            return replyReceivedForOffset == offset;
+        }
+
+        public boolean isLastChunk(int chunkIndex) {
+            return totalChunks == chunkIndex;
+        }
+
+        public void markSendStatus(boolean success) {
+            if (success) {
+                // if the chunk sent was successful
+                replyReceivedForOffset = offset;
+                replyStatus = true;
+            } else {
+                // if the chunk sent was failure
+                replyReceivedForOffset = offset;
+                replyStatus = false;
+            }
+        }
+
+        public ByteString getNextChunk() {
+            int snapshotLength = getSnapshotBytes().size();
+            int start = incrementOffset();
+            int size = context.getConfigParams().getSnapshotChunkSize();
+            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+                size = snapshotLength;
+            } else {
+                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+                    size = snapshotLength - start;
+                }
+            }
+
+            context.getLogger().debug("length={}, offset={},size={}",
+                snapshotLength, start, size);
+            return getSnapshotBytes().substring(start, start + size);
+
+        }
+    }
+
 }
index 888854fa71eaf4f745e11456c7b09349a7d8a443..9d40fa3d9edb3858969797e929776ddcba424333 100644 (file)
@@ -8,19 +8,29 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+
 public class InstallSnapshot extends AbstractRaftRPC {
 
+    public static final Class SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+
     private final String leaderId;
     private final long lastIncludedIndex;
     private final long lastIncludedTerm;
-    private final Object data;
+    private final ByteString data;
+    private final int chunkIndex;
+    private final int totalChunks;
 
-    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) {
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
         this.lastIncludedTerm = lastIncludedTerm;
         this.data = data;
+        this.chunkIndex = chunkIndex;
+        this.totalChunks = totalChunks;
     }
 
     public String getLeaderId() {
@@ -35,7 +45,38 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return lastIncludedTerm;
     }
 
-    public Object getData() {
+    public ByteString getData() {
         return data;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public int getTotalChunks() {
+        return totalChunks;
+    }
+
+    public <T extends Object> Object toSerializable(){
+        return InstallSnapshotMessages.InstallSnapshot.newBuilder()
+            .setLeaderId(this.getLeaderId())
+            .setChunkIndex(this.getChunkIndex())
+            .setData(this.getData())
+            .setLastIncludedIndex(this.getLastIncludedIndex())
+            .setLastIncludedTerm(this.getLastIncludedTerm())
+            .setTotalChunks(this.getTotalChunks()).build();
+
+    }
+
+    public static InstallSnapshot fromSerializable (Object o) {
+        InstallSnapshotMessages.InstallSnapshot from =
+            (InstallSnapshotMessages.InstallSnapshot) o;
+
+        InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+            from.getLeaderId(), from.getLastIncludedIndex(),
+            from.getLastIncludedTerm(), from.getData(),
+            from.getChunkIndex(), from.getTotalChunks());
+
+        return installSnapshot;
+    }
 }
index 85b89b70ae2b84c8ab26fcb7e16ba041fbd2040a..d293a47c8efdd1521afa4652f7a6a100e4c692ab 100644 (file)
@@ -13,13 +13,26 @@ public class InstallSnapshotReply extends AbstractRaftRPC {
     // The followerId - this will be used to figure out which follower is
     // responding
     private final String followerId;
+    private final int chunkIndex;
+    private boolean success;
 
-    protected InstallSnapshotReply(long term, String followerId) {
+    public InstallSnapshotReply(long term, String followerId, int chunkIndex,
+        boolean success) {
         super(term);
         this.followerId = followerId;
+        this.chunkIndex = chunkIndex;
+        this.success = success;
     }
 
     public String getFollowerId() {
         return followerId;
     }
+
+    public int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java
new file mode 100644 (file)
index 0000000..e801ae1
--- /dev/null
@@ -0,0 +1,1015 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: InstallSnapshot.proto
+
+package org.opendaylight.controller.cluster.raft.protobuff.messages;
+
+public final class InstallSnapshotMessages {
+  private InstallSnapshotMessages() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface InstallSnapshotOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 term = 1;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    boolean hasTerm();
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    long getTerm();
+
+    // optional string leaderId = 2;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    boolean hasLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    java.lang.String getLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getLeaderIdBytes();
+
+    // optional int64 lastIncludedIndex = 3;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    boolean hasLastIncludedIndex();
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    long getLastIncludedIndex();
+
+    // optional int64 lastIncludedTerm = 4;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    boolean hasLastIncludedTerm();
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    long getLastIncludedTerm();
+
+    // optional bytes data = 5;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    boolean hasData();
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    com.google.protobuf.ByteString getData();
+
+    // optional int32 chunkIndex = 6;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    boolean hasChunkIndex();
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    int getChunkIndex();
+
+    // optional int32 totalChunks = 7;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    boolean hasTotalChunks();
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    int getTotalChunks();
+  }
+  /**
+   * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+   */
+  public static final class InstallSnapshot extends
+      com.google.protobuf.GeneratedMessage
+      implements InstallSnapshotOrBuilder {
+    // Use InstallSnapshot.newBuilder() to construct.
+    private InstallSnapshot(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private InstallSnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final InstallSnapshot defaultInstance;
+    public static InstallSnapshot getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public InstallSnapshot getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private InstallSnapshot(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              term_ = input.readInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              leaderId_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              lastIncludedIndex_ = input.readInt64();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              lastIncludedTerm_ = input.readInt64();
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              data_ = input.readBytes();
+              break;
+            }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              chunkIndex_ = input.readInt32();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              totalChunks_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
+        new com.google.protobuf.AbstractParser<InstallSnapshot>() {
+      public InstallSnapshot parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new InstallSnapshot(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<InstallSnapshot> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int64 term = 1;
+    public static final int TERM_FIELD_NUMBER = 1;
+    private long term_;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public boolean hasTerm() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    public long getTerm() {
+      return term_;
+    }
+
+    // optional string leaderId = 2;
+    public static final int LEADERID_FIELD_NUMBER = 2;
+    private java.lang.Object leaderId_;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public boolean hasLeaderId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public java.lang.String getLeaderId() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          leaderId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getLeaderIdBytes() {
+      java.lang.Object ref = leaderId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        leaderId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional int64 lastIncludedIndex = 3;
+    public static final int LASTINCLUDEDINDEX_FIELD_NUMBER = 3;
+    private long lastIncludedIndex_;
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public boolean hasLastIncludedIndex() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int64 lastIncludedIndex = 3;</code>
+     */
+    public long getLastIncludedIndex() {
+      return lastIncludedIndex_;
+    }
+
+    // optional int64 lastIncludedTerm = 4;
+    public static final int LASTINCLUDEDTERM_FIELD_NUMBER = 4;
+    private long lastIncludedTerm_;
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public boolean hasLastIncludedTerm() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional int64 lastIncludedTerm = 4;</code>
+     */
+    public long getLastIncludedTerm() {
+      return lastIncludedTerm_;
+    }
+
+    // optional bytes data = 5;
+    public static final int DATA_FIELD_NUMBER = 5;
+    private com.google.protobuf.ByteString data_;
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public boolean hasData() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes data = 5;</code>
+     */
+    public com.google.protobuf.ByteString getData() {
+      return data_;
+    }
+
+    // optional int32 chunkIndex = 6;
+    public static final int CHUNKINDEX_FIELD_NUMBER = 6;
+    private int chunkIndex_;
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public boolean hasChunkIndex() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int32 chunkIndex = 6;</code>
+     */
+    public int getChunkIndex() {
+      return chunkIndex_;
+    }
+
+    // optional int32 totalChunks = 7;
+    public static final int TOTALCHUNKS_FIELD_NUMBER = 7;
+    private int totalChunks_;
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public boolean hasTotalChunks() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional int32 totalChunks = 7;</code>
+     */
+    public int getTotalChunks() {
+      return totalChunks_;
+    }
+
+    private void initFields() {
+      term_ = 0L;
+      leaderId_ = "";
+      lastIncludedIndex_ = 0L;
+      lastIncludedTerm_ = 0L;
+      data_ = com.google.protobuf.ByteString.EMPTY;
+      chunkIndex_ = 0;
+      totalChunks_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt64(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt32(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeInt32(7, totalChunks_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, term_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getLeaderIdBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(3, lastIncludedIndex_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(4, lastIncludedTerm_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, data_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(6, chunkIndex_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, totalChunks_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+      }
+
+      // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        term_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        leaderId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        lastIncludedIndex_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedTerm_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        data_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        chunkIndex_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        totalChunks_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.term_ = term_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.leaderId_ = leaderId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.lastIncludedIndex_ = lastIncludedIndex_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.lastIncludedTerm_ = lastIncludedTerm_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.data_ = data_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.chunkIndex_ = chunkIndex_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.totalChunks_ = totalChunks_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
+          return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
+        if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+        if (other.hasTerm()) {
+          setTerm(other.getTerm());
+        }
+        if (other.hasLeaderId()) {
+          bitField0_ |= 0x00000002;
+          leaderId_ = other.leaderId_;
+          onChanged();
+        }
+        if (other.hasLastIncludedIndex()) {
+          setLastIncludedIndex(other.getLastIncludedIndex());
+        }
+        if (other.hasLastIncludedTerm()) {
+          setLastIncludedTerm(other.getLastIncludedTerm());
+        }
+        if (other.hasData()) {
+          setData(other.getData());
+        }
+        if (other.hasChunkIndex()) {
+          setChunkIndex(other.getChunkIndex());
+        }
+        if (other.hasTotalChunks()) {
+          setTotalChunks(other.getTotalChunks());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int64 term = 1;
+      private long term_ ;
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public boolean hasTerm() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public long getTerm() {
+        return term_;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder setTerm(long value) {
+        bitField0_ |= 0x00000001;
+        term_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      public Builder clearTerm() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        term_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional string leaderId = 2;
+      private java.lang.Object leaderId_ = "";
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public boolean hasLeaderId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public java.lang.String getLeaderId() {
+        java.lang.Object ref = leaderId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          leaderId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getLeaderIdBytes() {
+        java.lang.Object ref = leaderId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          leaderId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder clearLeaderId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        leaderId_ = getDefaultInstance().getLeaderId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string leaderId = 2;</code>
+       */
+      public Builder setLeaderIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        leaderId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedIndex = 3;
+      private long lastIncludedIndex_ ;
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public boolean hasLastIncludedIndex() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public long getLastIncludedIndex() {
+        return lastIncludedIndex_;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder setLastIncludedIndex(long value) {
+        bitField0_ |= 0x00000004;
+        lastIncludedIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedIndex = 3;</code>
+       */
+      public Builder clearLastIncludedIndex() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        lastIncludedIndex_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 lastIncludedTerm = 4;
+      private long lastIncludedTerm_ ;
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public boolean hasLastIncludedTerm() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public long getLastIncludedTerm() {
+        return lastIncludedTerm_;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder setLastIncludedTerm(long value) {
+        bitField0_ |= 0x00000008;
+        lastIncludedTerm_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastIncludedTerm = 4;</code>
+       */
+      public Builder clearLastIncludedTerm() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        lastIncludedTerm_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional bytes data = 5;
+      private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public boolean hasData() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public com.google.protobuf.ByteString getData() {
+        return data_;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder setData(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        data_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes data = 5;</code>
+       */
+      public Builder clearData() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        data_ = getDefaultInstance().getData();
+        onChanged();
+        return this;
+      }
+
+      // optional int32 chunkIndex = 6;
+      private int chunkIndex_ ;
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public boolean hasChunkIndex() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public int getChunkIndex() {
+        return chunkIndex_;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder setChunkIndex(int value) {
+        bitField0_ |= 0x00000020;
+        chunkIndex_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 chunkIndex = 6;</code>
+       */
+      public Builder clearChunkIndex() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        chunkIndex_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional int32 totalChunks = 7;
+      private int totalChunks_ ;
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public boolean hasTotalChunks() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public int getTotalChunks() {
+        return totalChunks_;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder setTotalChunks(int value) {
+        bitField0_ |= 0x00000040;
+        totalChunks_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 totalChunks = 7;</code>
+       */
+      public Builder clearTotalChunks() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        totalChunks_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+    }
+
+    static {
+      defaultInstance = new InstallSnapshot(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+  }
+
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n\025InstallSnapshot.proto\022(org.opendayligh" +
+      "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+      "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
+      "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
+      "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
+      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
+      "light.controller.cluster.raft.protobuff." +
+      "messagesB\027InstallSnapshotMessagesH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
+              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto
new file mode 100644 (file)
index 0000000..14f821b
--- /dev/null
@@ -0,0 +1,15 @@
+package org.opendaylight.controller.cluster.raft;
+
+option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_outer_classname = "InstallSnapshotMessages";
+option optimize_for = SPEED;
+
+message InstallSnapshot {
+    optional int64 term = 1;
+    optional string leaderId = 2;
+    optional int64 lastIncludedIndex = 3;
+    optional int64 lastIncludedTerm = 4;
+    optional bytes data = 5;
+    optional int32 chunkIndex = 6;
+    optional int32 totalChunks = 7;
+}
index ea3c9e759d51744fb2dd36212bf8372e2f06f0c8..ca34a34ca49337783ec7fda8a9b9966fba1f16ac 100644 (file)
@@ -14,17 +14,14 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-import com.google.common.base.Preconditions;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class MockRaftActorContext implements RaftActorContext {
@@ -37,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private final ElectionTerm electionTerm;
     private ReplicatedLog replicatedLog;
     private Map<String, String> peerAddresses = new HashMap();
+    private ConfigParams configParams;
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -79,6 +77,8 @@ public class MockRaftActorContext implements RaftActorContext {
             }
         };
 
+        configParams = new DefaultConfigParamsImpl();
+
         initReplicatedLog();
     }
 
@@ -179,118 +179,21 @@ public class MockRaftActorContext implements RaftActorContext {
 
     @Override
     public ConfigParams getConfigParams() {
-        return new DefaultConfigParamsImpl();
+        return configParams;
     }
 
-    public static class SimpleReplicatedLog implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> log = new ArrayList<>();
-
-        @Override public ReplicatedLogEntry get(long index) {
-            if(index >= log.size() || index < 0){
-                return null;
-            }
-            return log.get((int) index);
-        }
-
-        @Override public ReplicatedLogEntry last() {
-            if(log.size() == 0){
-                return null;
-            }
-            return log.get(log.size()-1);
-        }
-
-        @Override public long lastIndex() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getIndex();
-        }
-
-        @Override public long lastTerm() {
-            if(log.size() == 0){
-                return -1;
-            }
-
-            return last().getTerm();
-        }
-
-        @Override public void removeFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return;
-            }
-
-            log.subList((int) index, log.size()).clear();
-            //log.remove((int) index);
-        }
-
-        @Override public void removeFromAndPersist(long index) {
-            removeFrom(index);
-        }
-
-        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
-            log.add(replicatedLogEntry);
-        }
+    public void setConfigParams(ConfigParams configParams) {
+        this.configParams = configParams;
+    }
 
+    public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
             append(replicatedLogEntry);
         }
 
-        @Override public List<ReplicatedLogEntry> getFrom(long index) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            for(int i=(int) index ; i < log.size() ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-        }
-
-        @Override public List<ReplicatedLogEntry> getFrom(long index, int max) {
-            if(index >= log.size() || index < 0){
-                return Collections.EMPTY_LIST;
-            }
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            int maxIndex = (int) index + max;
-            if(maxIndex > log.size()){
-                maxIndex = log.size();
-            }
-
-            for(int i=(int) index ; i < maxIndex ; i++) {
-                entries.add(get(i));
-            }
-            return entries;
-
-        }
-
-        @Override public long size() {
-            return log.size();
-        }
-
-        @Override public boolean isPresent(long index) {
-            if(index >= log.size() || index < 0){
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override public boolean isInSnapshot(long index) {
-            return false;
-        }
-
-        @Override public Object getSnapshot() {
-            return null;
-        }
-
-        @Override public long getSnapshotIndex() {
-            return -1;
-        }
-
-        @Override public long getSnapshotTerm() {
-            return -1;
+        @Override public void removeFromAndPersist(long index) {
+            removeFrom(index);
         }
     }
 
index ff0ffeb271b55b38455901d03cb31871620b078c..12123db12995061901a39a264c79f0237d78d00a 100644 (file)
@@ -6,6 +6,7 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import com.google.protobuf.ByteString;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
@@ -39,11 +40,11 @@ public class RaftActorTest extends AbstractActorTest {
             Object data) {
         }
 
-        @Override protected Object createSnapshot() {
+        @Override protected void createSnapshot() {
             throw new UnsupportedOperationException("createSnapshot");
         }
 
-        @Override protected void applySnapshot(Object snapshot) {
+        @Override protected void applySnapshot(ByteString snapshot) {
             throw new UnsupportedOperationException("applySnapshot");
         }
 
index c5a81aa1c9225ea03fa548bf1950d5e73a7e3329..227d1effa7e9b8b3bb65932fe8c25b1a2eecdbf5 100644 (file)
@@ -158,17 +158,18 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 createActorContext();
 
             context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 0, 0, new MockRaftActorContext.MockPayload(""));
+            setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload(""));
+            ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
 
             List<ReplicatedLogEntry> entries =
                 Arrays.asList(
-                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
+                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
                         new MockRaftActorContext.MockPayload("foo"))
                 );
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
 
             RaftState raftState =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
index 17c22a134a9a7f26e08998930b2609b128f40c21..73c9f96b82a0a582f4cf5e61b5d68c488f9bc198 100644 (file)
@@ -1,24 +1,40 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.google.protobuf.ByteString;
+import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
@@ -82,8 +98,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
-
-
             };
         }};
     }
@@ -194,18 +208,372 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     assertEquals("match", out);
 
                 }
+            };
+        }};
+    }
+
+    @Test
+    public void testSendInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext(getRef());
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+                    // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+                    RaftState raftState = leader.handleMessage(
+                        senderActor, new Replicate(null, "state-id", entry));
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
+                    Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+                        @Override
+                        protected Boolean match(Object o) throws Exception {
+                            if (o instanceof SendInstallSnapshot) {
+                                return true;
+                            }
+                            return false;
+                        }
+                    }.get();
+
+                    boolean sendInstallSnapshotReceived = false;
+                    for (Boolean b: matches) {
+                        sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
+                    }
+
+                    assertTrue(sendInstallSnapshotReceived);
+
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testInstallSnapshot() {
+        new LeaderTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    // new entry
+                    ReplicatedLogImplEntry entry =
+                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                            new MockRaftActorContext.MockPayload("D"));
+
+
+                    RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+
+                    assertEquals(RaftState.Leader, raftState);
+
+                    // check if installsnapshot gets called with the correct values.
+                    final String out =
+                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected String match(Object in) {
+                                if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+                                    InstallSnapshot is = (InstallSnapshot)
+                                        SerializationUtils.fromSerializable(in);
+                                    if (is.getData() == null) {
+                                        return "InstallSnapshot data is null";
+                                    }
+                                    if (is.getLastIncludedIndex() != snapshotIndex) {
+                                        return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+                                    }
+                                    if (is.getLastIncludedTerm() != snapshotTerm) {
+                                        return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+                                    }
+                                    if (is.getTerm() == currentTerm) {
+                                        return is.getTerm() + "!=" + currentTerm;
+                                    }
+
+                                    return "match";
+
+                               } else {
+                                    return "message mismatch:" + in.getClass();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleInstallSnapshotReplyLastChunk() {
+        new LeaderTestKit(getSystem()) {{
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    ActorRef followerActor = getTestActor();
+
+                    Map<String, String> peerAddresses = new HashMap();
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+                    actorContext.setPeerAddresses(peerAddresses);
+
+                    final int followersLastIndex = 2;
+                    final int snapshotIndex = 3;
+                    final int newEntryIndex = 4;
+                    final int snapshotTerm = 1;
+                    final int currentTerm = 2;
+
+                    MockLeader leader = new MockLeader(actorContext);
+                    // set the follower info in leader
+                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+                    Map<String, String> leadersSnapshot = new HashMap<>();
+                    leadersSnapshot.put("1", "A");
+                    leadersSnapshot.put("2", "B");
+                    leadersSnapshot.put("3", "C");
+
+                    // set the snapshot variables in replicatedlog
+                    actorContext.getReplicatedLog().setSnapshot(
+                        toByteString(leadersSnapshot));
+                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                    ByteString bs = toByteString(leadersSnapshot);
+                    leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+                    while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+                        leader.getFollowerToSnapshot().getNextChunk();
+                        leader.getFollowerToSnapshot().incrementChunkIndex();
+                    }
+
+                    //clears leaders log
+                    actorContext.getReplicatedLog().removeFrom(0);
 
+                    RaftState raftState = leader.handleMessage(senderActor,
+                        new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+                            leader.getFollowerToSnapshot().getChunkIndex(), true));
 
+                    assertEquals(RaftState.Leader, raftState);
+
+                    assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+                    assertEquals(leader.followerToLog.size(), 1);
+                    assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+                    FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
+                    assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+                }
             };
         }};
     }
 
+    @Test
+    public void testFollowerToSnapshotLogic() {
+
+        MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
+
+        MockLeader leader = new MockLeader(actorContext);
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        ByteString bs = toByteString(leadersSnapshot);
+        byte[] barray = bs.toByteArray();
+
+        leader.createFollowerToSnapshot("followerId", bs);
+        assertEquals(bs.size(), barray.length);
+
+        int chunkIndex=0;
+        for (int i=0; i < barray.length; i = i + 50) {
+            int j = i + 50;
+            chunkIndex++;
+
+            if (i + 50 > barray.length) {
+                j = barray.length;
+            }
+
+            ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+            assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+
+            leader.getFollowerToSnapshot().markSendStatus(true);
+            if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
+                leader.getFollowerToSnapshot().incrementChunkIndex();
+            }
+        }
+
+        assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+    }
+
+
     @Override protected RaftActorBehavior createBehavior(
         RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
     @Override protected RaftActorContext createActorContext() {
-        return new MockRaftActorContext("test", getSystem(), leaderActor);
+        return createActorContext(leaderActor);
+    }
+
+    protected RaftActorContext createActorContext(ActorRef actorRef) {
+        return new MockRaftActorContext("test", getSystem(), actorRef);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
+
+    private static class LeaderTestKit extends JavaTestKit {
+
+        private LeaderTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+            new JavaTestKit.EventFilter<Boolean>(logLevel
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(subject.path().toString())
+                .message(logMessage)
+                .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+    }
+
+    class MockLeader extends Leader {
+
+        FollowerToSnapshot fts;
+
+        public MockLeader(RaftActorContext context){
+            super(context);
+        }
+
+        public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
+            FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(nextIndex),
+                    new AtomicLong(matchIndex));
+            followerToLog.put(followerId, followerLogInformation);
+        }
+
+        public FollowerToSnapshot getFollowerToSnapshot() {
+            return fts;
+        }
+
+        public void createFollowerToSnapshot(String followerId, ByteString bs ) {
+            fts = new FollowerToSnapshot(bs);
+            mapFollowerToSnapshot.put(followerId, fts);
+
+        }
     }
 }
index 43a9faa3e44e5d6fe77db7cce74929f8888dcadb..d8af74c84c7a0acd9fdaa106873196b49fd4850e 100644 (file)
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -365,7 +365,6 @@ public class Shard extends RaftActor {
                     identifier, clientActor.path().toString());
             }
 
-
         } else {
             LOG.error("Unknown state received {}", data);
         }
@@ -383,11 +382,11 @@ public class Shard extends RaftActor {
 
     }
 
-    @Override protected Object createSnapshot() {
+    @Override protected void createSnapshot() {
         throw new UnsupportedOperationException("createSnapshot");
     }
 
-    @Override protected void applySnapshot(Object snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         throw new UnsupportedOperationException("applySnapshot");
     }