BUG 2371 : Leader should reset it's snapshot tracking when follower is restarted 09/13109/5
authorMoiz Raja <moraja@cisco.com>
Tue, 25 Nov 2014 05:22:34 +0000 (21:22 -0800)
committerMoiz Raja <moraja@cisco.com>
Mon, 1 Dec 2014 18:38:52 +0000 (10:38 -0800)
This patch adds a new protocol to InstallSnapshot. It the InstallSnapshotReply returns
a failure and the chunkIndex is -1 then the Leader will reset the FollowerSnapshot so
that when the next heartbeat occurs the Leader would start sending chunks from the beginning.

Change-Id: I0d5f0a4230209856ecf9bcef46220ae348f52b5d
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto

index d85ac8e..e5c5dc7 100644 (file)
@@ -67,6 +67,16 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+    // The index of the first chunk that is sent when installing a snapshot
+    public static final int FIRST_CHUNK_INDEX = 1;
+
+    // The index that the follower should respond with if it needs the install snapshot to be reset
+    public static final int INVALID_CHUNK_INDEX = -1;
+
+    // This would be passed as the hash code of the last chunk when sending the first chunk
+    public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
     protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
@@ -332,6 +342,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         "sending snapshot chunk failed, Will retry, Chunk:{}",
                     reply.getChunkIndex()
                 );
+
                 followerToSnapshot.markSendStatus(false);
             }
 
@@ -341,6 +352,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
             );
+
+            if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+                // Since the Follower did not find this index to be valid we should reset the follower snapshot
+                // so that Installing the snapshot can resume from the beginning
+                followerToSnapshot.reset();
+            }
         }
     }
 
@@ -539,7 +556,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotTerm(),
                         getNextSnapshotChunk(followerId,snapshot.get()),
                         mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
@@ -636,11 +654,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         private boolean replyStatus = false;
         private int chunkIndex;
         private int totalChunks;
+        private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
         public FollowerToSnapshot(ByteString snapshotBytes) {
             this.snapshotBytes = snapshotBytes;
-            replyReceivedForOffset = -1;
-            chunkIndex = 1;
             int size = snapshotBytes.size();
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
@@ -648,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
                     size, totalChunks);
             }
+            replyReceivedForOffset = -1;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
         }
 
         public ByteString getSnapshotBytes() {
@@ -692,6 +712,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // if the chunk sent was successful
                 replyReceivedForOffset = offset;
                 replyStatus = true;
+                lastChunkHashCode = nextChunkHashCode;
             } else {
                 // if the chunk sent was failure
                 replyReceivedForOffset = offset;
@@ -715,8 +736,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("length={}, offset={},size={}",
                     snapshotLength, start, size);
             }
-            return getSnapshotBytes().substring(start, start + size);
+            ByteString substring = getSnapshotBytes().substring(start, start + size);
+            nextChunkHashCode = substring.hashCode();
+            return substring;
+        }
+
+        /**
+         * reset should be called when the Follower needs to be sent the snapshot from the beginning
+         */
+        public void reset(){
+            offset = 0;
+            replyStatus = false;
+            replyReceivedForOffset = offset;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+            lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        }
 
+        public int getLastChunkHashCode() {
+            return lastChunkHashCode;
         }
     }
 
index 7ada8b3..b1c73f6 100644 (file)
@@ -9,7 +9,9 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.ArrayList;
-
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -36,7 +36,8 @@ import java.util.ArrayList;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
-    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
+    private SnapshotTracker snapshotTracker = null;
 
     public Follower(RaftActorContext context) {
         super(context);
@@ -280,48 +281,54 @@ public class Follower extends AbstractRaftActorBehavior {
             );
         }
 
-        try {
-            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
-                // this is the last chunk, create a snapshot object and apply
-
-                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
-                            snapshotChunksCollected.size());
-                }
+        if(snapshotTracker == null){
+            snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+        }
 
-                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
-                    new ArrayList<ReplicatedLogEntry>(),
-                    installSnapshot.getLastIncludedIndex(),
-                    installSnapshot.getLastIncludedTerm(),
-                    installSnapshot.getLastIncludedIndex(),
-                    installSnapshot.getLastIncludedTerm());
+        try {
+            if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+                    installSnapshot.getLastChunkHashCode())){
+                Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+                        new ArrayList<ReplicatedLogEntry>(),
+                        installSnapshot.getLastIncludedIndex(),
+                        installSnapshot.getLastIncludedTerm(),
+                        installSnapshot.getLastIncludedIndex(),
+                        installSnapshot.getLastIncludedTerm());
 
                 actor().tell(new ApplySnapshot(snapshot), actor());
 
-            } else {
-                // we have more to go
-                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                snapshotTracker = null;
 
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
-                        installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
-                }
             }
 
             sender.tell(new InstallSnapshotReply(
-                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
-                true), actor());
+                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                    true), actor());
+
+        } catch (SnapshotTracker.InvalidChunkException e) {
+
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                    -1, false), actor());
+            snapshotTracker = null;
+
+        } catch (Exception e){
 
-        } catch (Exception e) {
             LOG.error(e, "Exception in InstallSnapshot of follower:");
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
-                installSnapshot.getChunkIndex(), false), actor());
+                    installSnapshot.getChunkIndex(), false), actor());
+
         }
     }
 
     @Override public void close() throws Exception {
         stopElection();
     }
+
+    @VisibleForTesting
+    ByteString getSnapshotChunksCollected(){
+        return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+    }
+
+
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
new file mode 100644 (file)
index 0000000..26fbde0
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+
+/**
+ * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
+ */
+public class SnapshotTracker {
+    private final LoggingAdapter LOG;
+    private final int totalChunks;
+    private ByteString collectedChunks = ByteString.EMPTY;
+    private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
+    private boolean sealed = false;
+    private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+
+    SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+        this.LOG = LOG;
+        this.totalChunks = totalChunks;
+    }
+
+    /**
+     * Adds a chunk to the tracker
+     *
+     * @param chunkIndex
+     * @param chunk
+     * @return true when the lastChunk is received
+     * @throws InvalidChunkException
+     */
+    boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+        if(sealed){
+            throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
+        }
+
+        if(lastChunkIndex + 1 != chunkIndex){
+            throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
+        }
+
+        if(lastChunkHashCode.isPresent()){
+            if(lastChunkHashCode.get() != this.lastChunkHashCode){
+                throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
+                        "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+            }
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Chunk={},collectedChunks.size:{}",
+                    chunkIndex, collectedChunks.size());
+        }
+
+        sealed = (chunkIndex == totalChunks);
+        lastChunkIndex = chunkIndex;
+        collectedChunks = collectedChunks.concat(chunk);
+        this.lastChunkHashCode = chunk.hashCode();
+        return sealed;
+    }
+
+    byte[] getSnapshot(){
+        if(!sealed) {
+            throw new IllegalStateException("lastChunk not received yet");
+        }
+
+        return collectedChunks.toByteArray();
+    }
+
+    ByteString getCollectedChunks(){
+        return collectedChunks;
+    }
+
+    public static class InvalidChunkException extends Exception {
+        InvalidChunkException(String message){
+            super(message);
+        }
+    }
+
+}
index 3c4e811..6337f8f 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
@@ -22,9 +23,10 @@ public class InstallSnapshot extends AbstractRaftRPC {
     private final ByteString data;
     private final int chunkIndex;
     private final int totalChunks;
+    private final Optional<Integer> lastChunkHashCode;
 
     public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
-        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
@@ -32,8 +34,15 @@ public class InstallSnapshot extends AbstractRaftRPC {
         this.data = data;
         this.chunkIndex = chunkIndex;
         this.totalChunks = totalChunks;
+        this.lastChunkHashCode = lastChunkHashCode;
     }
 
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+                           long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+        this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
+    }
+
+
     public String getLeaderId() {
         return leaderId;
     }
@@ -58,25 +67,38 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return totalChunks;
     }
 
-    public <T extends Object> Object toSerializable(){
-        return InstallSnapshotMessages.InstallSnapshot.newBuilder()
-            .setLeaderId(this.getLeaderId())
-            .setChunkIndex(this.getChunkIndex())
-            .setData(this.getData())
-            .setLastIncludedIndex(this.getLastIncludedIndex())
-            .setLastIncludedTerm(this.getLastIncludedTerm())
-            .setTotalChunks(this.getTotalChunks()).build();
+    public Optional<Integer> getLastChunkHashCode() {
+        return lastChunkHashCode;
+    }
 
+    public <T extends Object> Object toSerializable(){
+        InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+                .setLeaderId(this.getLeaderId())
+                .setChunkIndex(this.getChunkIndex())
+                .setData(this.getData())
+                .setLastIncludedIndex(this.getLastIncludedIndex())
+                .setLastIncludedTerm(this.getLastIncludedTerm())
+                .setTotalChunks(this.getTotalChunks());
+
+        if(lastChunkHashCode.isPresent()){
+            builder.setLastChunkHashCode(lastChunkHashCode.get());
+        }
+        return builder.build();
     }
 
     public static InstallSnapshot fromSerializable (Object o) {
         InstallSnapshotMessages.InstallSnapshot from =
             (InstallSnapshotMessages.InstallSnapshot) o;
 
+        Optional<Integer> lastChunkHashCode = Optional.absent();
+        if(from.hasLastChunkHashCode()){
+            lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+        }
+
         InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
             from.getLeaderId(), from.getLastIncludedIndex(),
             from.getLastIncludedTerm(), from.getData(),
-            from.getChunkIndex(), from.getTotalChunks());
+            from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
 
         return installSnapshot;
     }
index 83b9ad3..0ee9693 100644 (file)
@@ -1,9 +1,20 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -20,19 +31,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private final ActorRef followerActor = getSystem().actorOf(Props.create(
@@ -452,18 +450,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             int offset = 0;
             int snapshotLength = bsSnapshot.size();
             int i = 1;
+            int chunkIndex = 1;
 
             do {
                 chunkData = getNextChunk(bsSnapshot, offset);
                 final InstallSnapshot installSnapshot =
                     new InstallSnapshot(1, "leader-1", i, 1,
-                        chunkData, i, 3);
+                        chunkData, chunkIndex, 3);
                 follower.handleMessage(leaderActor, installSnapshot);
                 offset = offset + 50;
                 i++;
+                chunkIndex++;
             } while ((offset+50) < snapshotLength);
 
-            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
+            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
             follower.handleMessage(leaderActor, installSnapshot3);
 
             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
@@ -490,6 +490,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 }
             }.get();
 
+            // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
+            assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
             String applySnapshotMatch = "";
             for (String reply: matches) {
                 if (reply.startsWith("applySnapshot")) {
@@ -517,6 +520,52 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+        JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor = getSystem().actorOf(Props.create(
+                        MessageCollectorActor.class));
+
+                MockRaftActorContext context = (MockRaftActorContext)
+                        createActorContext(getRef());
+
+                Follower follower = (Follower) createBehavior(context);
+
+                HashMap<String, String> followerSnapshot = new HashMap<>();
+                followerSnapshot.put("1", "A");
+                followerSnapshot.put("2", "B");
+                followerSnapshot.put("3", "C");
+
+                ByteString bsSnapshot = toByteString(followerSnapshot);
+
+                final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
+                follower.handleMessage(leaderActor, installSnapshot);
+
+                Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+                assertNotNull(messages);
+                assertTrue(messages instanceof List);
+                List<Object> listMessages = (List<Object>) messages;
+
+                int installSnapshotReplyReceivedCount = 0;
+                for (Object message: listMessages) {
+                    if (message instanceof InstallSnapshotReply) {
+                        ++installSnapshotReplyReceivedCount;
+                    }
+                }
+
+                assertEquals(1, installSnapshotReplyReceivedCount);
+                InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
+                assertEquals(false, reply.isSuccess());
+                assertEquals(-1, reply.getChunkIndex());
+                assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+
+            }};
+    }
+
     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
         return MessageCollectorActor.getAllMessages(actor);
     }
index 0fc0b4c..895fe35 100644 (file)
@@ -1,11 +1,16 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -41,9 +46,6 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
@@ -125,7 +127,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     Map<String, String> peerAddresses = new HashMap<>();
 
                     peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                            followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
@@ -570,6 +572,156 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                    followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                    (MockRaftActorContext) createActorContext();
+
+            actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            });
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            followerActor.tell(PoisonPill.getInstance(), getRef());
+        }};
+    }
+
+    @Test
+    public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+
+                TestActorRef<MessageCollectorActor> followerActor =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                final int followersLastIndex = 2;
+                final int snapshotIndex = 3;
+                final int snapshotTerm = 1;
+                final int currentTerm = 2;
+
+                MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+
+                actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+                    @Override
+                    public int getSnapshotChunkSize() {
+                        return 50;
+                    }
+                });
+                actorContext.setPeerAddresses(peerAddresses);
+                actorContext.setCommitIndex(followersLastIndex);
+
+                MockLeader leader = new MockLeader(actorContext);
+
+                Map<String, String> leadersSnapshot = new HashMap<>();
+                leadersSnapshot.put("1", "A");
+                leadersSnapshot.put("2", "B");
+                leadersSnapshot.put("3", "C");
+
+                // set the snapshot variables in replicatedlog
+                actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                ByteString bs = toByteString(leadersSnapshot);
+                leader.setSnapshot(Optional.of(bs));
+
+                leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+                Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(1, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+                int hashCode = installSnapshot.getData().hashCode();
+
+                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+                o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(2, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+                followerActor.tell(PoisonPill.getInstance(), getRef());
+            }};
+    }
+
     @Test
     public void testFollowerToSnapshotLogic() {
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java
new file mode 100644 (file)
index 0000000..1b3a8f5
--- /dev/null
@@ -0,0 +1,181 @@
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnapshotTrackerTest {
+
+    Map<String, String> data;
+    ByteString byteString;
+    ByteString chunk1;
+    ByteString chunk2;
+    ByteString chunk3;
+
+    @Before
+    public void setup(){
+        data = new HashMap<>();
+        data.put("key1", "value1");
+        data.put("key2", "value2");
+        data.put("key3", "value3");
+
+        byteString = toByteString(data);
+        chunk1 = getNextChunk(byteString, 0, 10);
+        chunk2 = getNextChunk(byteString, 10, 10);
+        chunk3 = getNextChunk(byteString, 20, byteString.size());
+    }
+
+    @Test
+    public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+        tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
+
+        // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
+        SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+
+        try {
+            tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+            e.getMessage().startsWith("Invalid chunk");
+        }
+
+        // The first chunk's index must at least be FIRST_CHUNK_INDEX
+        SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        try {
+            tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+        // Out of sequence chunk indexes won't work
+        SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+
+        try {
+            tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+        // No exceptions will be thrown when invalid chunk is added with the right sequence
+        // If the lastChunkHashCode is missing
+        SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+        // Look I can add the same chunk again
+        tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
+
+        // An exception will be thrown when an invalid chunk is addedd with the right sequence
+        // when the lastChunkHashCode is present
+        SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
+
+        try {
+            // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
+            tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
+            Assert.fail();
+        }catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+    }
+
+    @Test
+    public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
+
+        // Trying to get a snapshot before all chunks have been received will throw an exception
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        try {
+            tracker1.getSnapshot();
+            Assert.fail();
+        } catch(IllegalStateException e){
+
+        }
+
+        SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+
+        tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+        tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+
+        byte[] snapshot = tracker2.getSnapshot();
+
+        assertEquals(byteString, ByteString.copyFrom(snapshot));
+    }
+
+    @Test
+    public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        ByteString chunks = chunk1.concat(chunk2);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+
+        assertEquals(chunks, tracker1.getCollectedChunks());
+    }
+
+    public ByteString getNextChunk (ByteString bs, int offset, int size){
+        int snapshotLength = bs.size();
+        int start = offset;
+        if (size > snapshotLength) {
+            size = snapshotLength;
+        } else {
+            if ((start + size) > snapshotLength) {
+                size = snapshotLength - start;
+            }
+        }
+        return bs.substring(start, start + size);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
+
+
+}
\ No newline at end of file
index 9fbdd45..3469a95 100644 (file)
@@ -13,15 +13,14 @@ import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 
 public class MessageCollectorActor extends UntypedActor {
     private List<Object> messages = new ArrayList<>();
index b93be3e..de7f44e 100644 (file)
@@ -85,6 +85,16 @@ public final class InstallSnapshotMessages {
      * <code>optional int32 totalChunks = 7;</code>
      */
     int getTotalChunks();
+
+    // optional int32 lastChunkHashCode = 8;
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    boolean hasLastChunkHashCode();
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    int getLastChunkHashCode();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
@@ -172,6 +182,11 @@ public final class InstallSnapshotMessages {
               totalChunks_ = input.readInt32();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              lastChunkHashCode_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -351,6 +366,22 @@ public final class InstallSnapshotMessages {
       return totalChunks_;
     }
 
+    // optional int32 lastChunkHashCode = 8;
+    public static final int LASTCHUNKHASHCODE_FIELD_NUMBER = 8;
+    private int lastChunkHashCode_;
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    public boolean hasLastChunkHashCode() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    public int getLastChunkHashCode() {
+      return lastChunkHashCode_;
+    }
+
     private void initFields() {
       term_ = 0L;
       leaderId_ = "";
@@ -359,6 +390,7 @@ public final class InstallSnapshotMessages {
       data_ = com.google.protobuf.ByteString.EMPTY;
       chunkIndex_ = 0;
       totalChunks_ = 0;
+      lastChunkHashCode_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -393,6 +425,9 @@ public final class InstallSnapshotMessages {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeInt32(7, totalChunks_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeInt32(8, lastChunkHashCode_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -430,6 +465,10 @@ public final class InstallSnapshotMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(7, totalChunks_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(8, lastChunkHashCode_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -560,6 +599,8 @@ public final class InstallSnapshotMessages {
         bitField0_ = (bitField0_ & ~0x00000020);
         totalChunks_ = 0;
         bitField0_ = (bitField0_ & ~0x00000040);
+        lastChunkHashCode_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -616,6 +657,10 @@ public final class InstallSnapshotMessages {
           to_bitField0_ |= 0x00000040;
         }
         result.totalChunks_ = totalChunks_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.lastChunkHashCode_ = lastChunkHashCode_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -655,6 +700,9 @@ public final class InstallSnapshotMessages {
         if (other.hasTotalChunks()) {
           setTotalChunks(other.getTotalChunks());
         }
+        if (other.hasLastChunkHashCode()) {
+          setLastChunkHashCode(other.getLastChunkHashCode());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -957,6 +1005,39 @@ public final class InstallSnapshotMessages {
         return this;
       }
 
+      // optional int32 lastChunkHashCode = 8;
+      private int lastChunkHashCode_ ;
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public boolean hasLastChunkHashCode() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public int getLastChunkHashCode() {
+        return lastChunkHashCode_;
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public Builder setLastChunkHashCode(int value) {
+        bitField0_ |= 0x00000080;
+        lastChunkHashCode_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public Builder clearLastChunkHashCode() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        lastChunkHashCode_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
     }
 
@@ -983,13 +1064,14 @@ public final class InstallSnapshotMessages {
   static {
     java.lang.String[] descriptorData = {
       "\n\025InstallSnapshot.proto\022(org.opendayligh" +
-      "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+      "t.controller.cluster.raft\"\270\001\n\017InstallSna" +
       "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
       "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
       "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
-      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
-      "light.controller.protobuff.messages.clus" +
-      "ter.raftB\027InstallSnapshotMessagesH\001"
+      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005\022\031\n\021lastChunkHa" +
+      "shCode\030\010 \001(\005BX\n;org.opendaylight.control" +
+      "ler.protobuff.messages.cluster.raftB\027Ins" +
+      "tallSnapshotMessagesH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1001,7 +1083,7 @@ public final class InstallSnapshotMessages {
           internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
-              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", "LastChunkHashCode", });
           return null;
         }
       };
index 4198644..adb58ae 100644 (file)
@@ -12,4 +12,5 @@ message InstallSnapshot {
     optional bytes data = 5;
     optional int32 chunkIndex = 6;
     optional int32 totalChunks = 7;
+    optional int32 lastChunkHashCode = 8;
 }