Backport InstallSnapshot message serialization changes 19/41019/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 27 Jan 2016 07:33:32 +0000 (02:33 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 5 Jul 2016 05:14:16 +0000 (05:14 +0000)
Backported https://git.opendaylight.org/gerrit/#/c/33768/ and
https://git.opendaylight.org/gerrit/#/c/33767/ from master to
eliminate the protobuff serialization to make it easier to change
the serialization. A subsequent patch will add a new field to
InstallSnapshot. Backwards compatibility with versions prior to
Be SR3 is maintained, ie protobuff serialization will still be used.

Change-Id: I465daba0b83e35bfe0e0d5c345a497dd7f9425d4
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
17 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java

index 6618a97f21f0d1b7145abc77171a950d9bf9d1e8..4367a7a151ff729bcac7de25f4fbeaf54b114089 100644 (file)
@@ -108,4 +108,14 @@ public interface FollowerLogInformation {
      * Sets the payload data version of the follower.
      */
     void setPayloadVersion(short payloadVersion);
+
+    /**
+     * @return the raft version of the follower.
+     */
+    short getRaftVersion();
+
+    /**
+     * Sets the raft version of the follower.
+     */
+    void setRaftVersion(short payloadVersion);
 }
index 1c8d5e6e10647bcbac9ad7f55dd247e6f51f94c6..89884462128b10147383350429f979bd1f8744ca 100644 (file)
@@ -27,6 +27,12 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private short payloadVersion = -1;
 
+    // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
+    // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
+    // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
+    // HELIUM_VERSION.
+    private short raftVersion = RaftVersions.HELIUM_VERSION;
+
     private final PeerInfo peerInfo;
 
     public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
@@ -153,6 +159,16 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         this.payloadVersion = payloadVersion;
     }
 
+    @Override
+    public short getRaftVersion() {
+        return raftVersion;
+    }
+
+    @Override
+    public void setRaftVersion(short raftVersion) {
+        this.raftVersion = raftVersion;
+    }
+
     @Override
     public String toString() {
         return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
index 4330a4caa01180dce5967c2259e728798e3530f0..9a67cff5c6f30ff5b8bbe8a99a8c6d99454c04c1 100644 (file)
@@ -13,5 +13,6 @@ package org.opendaylight.controller.cluster.raft;
 public interface RaftVersions {
     short HELIUM_VERSION = 0;
     short LITHIUM_VERSION = 1;
-    short CURRENT_VERSION = LITHIUM_VERSION;
+    short BERYLLIUM_SR3_VERSION = 3;
+    short CURRENT_VERSION = BERYLLIUM_SR3_VERSION;
 }
index 7ec32440e7fdfcbe24e2a252f9859860de634ff1..bd33d2385a3e32c13a1351034793f5ed5d4550a8 100644 (file)
@@ -17,7 +17,7 @@ public class SerializationUtils {
         if(AppendEntries.isSerializedType(serializable)){
             return AppendEntries.fromSerializable(serializable);
 
-        } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+        } else if (InstallSnapshot.isSerializedType(serializable)) {
             return InstallSnapshot.fromSerializable(serializable);
         }
         return serializable;
index 5271629df175ca99f0d920a3ba44c9f7e0eb177a..fc79532c68665ad7fafc89b243f0896fa38ddb35 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -211,6 +212,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         followerLogInformation.markFollowerActive();
         followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
+        followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
 
         boolean updated = false;
         if (appendEntriesReply.isSuccess()) {
@@ -709,7 +711,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
             if (snapshot.isPresent()) {
-                ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
+                byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
 
                 // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
                 // followerId to the followerToSnapshot map.
@@ -723,7 +725,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         followerToSnapshot.incrementChunkIndex(),
                         followerToSnapshot.getTotalChunks(),
                         Optional.of(followerToSnapshot.getLastChunkHashCode())
-                    ).toSerializable(),
+                    ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
                     actor()
                 );
 
@@ -742,15 +744,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * Acccepts snaphot as ByteString, enters into map for future chunks
      * creates and return a ByteString chunk
      */
-    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+    private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
         if (followerToSnapshot == null) {
             followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
         }
-        ByteString nextChunk = followerToSnapshot.getNextChunk();
+        byte[] nextChunk = followerToSnapshot.getNextChunk();
 
-        LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+        LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
 
         return nextChunk;
     }
@@ -890,25 +892,23 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
         }
 
-        public ByteString getNextChunk() {
+        public byte[] getNextChunk() {
             int snapshotLength = getSnapshotBytes().size();
             int start = incrementOffset();
             int size = context.getConfigParams().getSnapshotChunkSize();
             if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
                 size = snapshotLength;
-            } else {
-                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
-                    size = snapshotLength - start;
-                }
+            } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+                size = snapshotLength - start;
             }
 
+            byte[] nextChunk = new byte[size];
+            getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+            nextChunkHashCode = Arrays.hashCode(nextChunk);
 
-            LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
-                    snapshotLength, start, size);
-
-            ByteString substring = getSnapshotBytes().substring(start, start + size);
-            nextChunkHashCode = substring.hashCode();
-            return substring;
+            LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
+                    snapshotLength, start, size, nextChunkHashCode);
+            return nextChunk;
         }
 
         /**
index 1290abdff3e4ef0840a5081fe00be633a41b707c..5a275fbb101e75cabe009619127572735f91804c 100644 (file)
@@ -358,9 +358,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
-        LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
-                    logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
-                    installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+        LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
 
         leaderId = installSnapshot.getLeaderId();
 
index d26837f1808306edbabb159ca7f0c13084836f2d..bb7a24248188f0a83b9225a32a9b2a7e3e16f250 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+import java.util.Arrays;
 import org.slf4j.Logger;
 
 /**
@@ -36,7 +37,10 @@ public class SnapshotTracker {
      * @return true when the lastChunk is received
      * @throws InvalidChunkException
      */
-    boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+    boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+        LOG.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
+                chunkIndex, lastChunkIndex, collectedChunks.size(), this.lastChunkHashCode);
+
         if(sealed){
             throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
         }
@@ -48,19 +52,14 @@ public class SnapshotTracker {
         if(lastChunkHashCode.isPresent()){
             if(lastChunkHashCode.get() != this.lastChunkHashCode){
                 throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
-                        "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+                        "the senders hash code, expected " + this.lastChunkHashCode + " was " + lastChunkHashCode.get());
             }
         }
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Chunk={},collectedChunks.size:{}",
-                    chunkIndex, collectedChunks.size());
-        }
-
         sealed = (chunkIndex == totalChunks);
         lastChunkIndex = chunkIndex;
-        collectedChunks = collectedChunks.concat(chunk);
-        this.lastChunkHashCode = chunk.hashCode();
+        collectedChunks = collectedChunks.concat(ByteString.copyFrom(chunk));
+        this.lastChunkHashCode = Arrays.hashCode(chunk);
         return sealed;
     }
 
index 3c9ebf47fd7178eca126ce962504153ca7f8ad21..e5b84ca212c7b90009ba09838222d7498a09e12e 100644 (file)
@@ -27,7 +27,7 @@ public class AbstractRaftRPC implements RaftRPC {
         return term;
     }
 
-    public void setTerm(long term) {
+    protected void setTerm(long term) {
         this.term = term;
     }
 }
index 521a4512c7e49beef57ca56d2ac9f41e0d10a914..ffdfaa6a0ed112ce50eb22d881d2731dd7a69978 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+
 /**
  * Reply for the AppendEntriesRpc message
  */
@@ -31,6 +33,8 @@ public class AppendEntriesReply extends AbstractRaftRPC {
 
     private final short payloadVersion;
 
+    private final short raftVersion = RaftVersions.CURRENT_VERSION;
+
     private final boolean forceInstallSnapshot;
 
     public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
@@ -50,12 +54,6 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.forceInstallSnapshot = forceInstallSnapshot;
     }
 
-
-    @Override
-    public long getTerm() {
-        return term;
-    }
-
     public boolean isSuccess() {
         return success;
     }
@@ -76,17 +74,18 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         return payloadVersion;
     }
 
-    @Override
-    public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex)
-                .append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId)
-                .append(", payloadVersion=").append(", forceInstallSnapshot=").append(forceInstallSnapshot)
-                .append(payloadVersion).append("]");
-        return builder.toString();
+    public short getRaftVersion() {
+        return raftVersion;
     }
 
     public boolean isForceInstallSnapshot() {
         return forceInstallSnapshot;
     }
+
+    @Override
+    public String toString() {
+        return "AppendEntriesReply [term=" + getTerm() + ", success=" + success + ", followerId=" + followerId
+                + ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot="
+                + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]";
+    }
 }
index 13636f36d7594322432377b5156fece4d6279bd6..4ba0b6c0da1c27429c29730a7efffc4779f3f06d 100644 (file)
@@ -10,23 +10,33 @@ package org.opendaylight.controller.cluster.raft.messages;
 
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
-public class InstallSnapshot extends AbstractRaftRPC {
-
-    public static final Class<InstallSnapshotMessages.InstallSnapshot> SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
     private static final long serialVersionUID = 1L;
+    public static final Class<InstallSnapshotMessages.InstallSnapshot> SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
 
-    private final String leaderId;
-    private final long lastIncludedIndex;
-    private final long lastIncludedTerm;
-    private final ByteString data;
-    private final int chunkIndex;
-    private final int totalChunks;
-    private final Optional<Integer> lastChunkHashCode;
+    private String leaderId;
+    private long lastIncludedIndex;
+    private long lastIncludedTerm;
+    private byte[] data;
+    private int chunkIndex;
+    private int totalChunks;
+    private Optional<Integer> lastChunkHashCode;
+
+    /**
+     * Empty constructor to satisfy Externalizable.
+     */
+    public InstallSnapshot() {
+    }
 
     public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
-        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
+        long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
@@ -38,11 +48,10 @@ public class InstallSnapshot extends AbstractRaftRPC {
     }
 
     public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
-                           long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+                           long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks) {
         this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
     }
 
-
     public String getLeaderId() {
         return leaderId;
     }
@@ -55,7 +64,7 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return lastIncludedTerm;
     }
 
-    public ByteString getData() {
+    public byte[] getData() {
         return data;
     }
 
@@ -71,47 +80,92 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return lastChunkHashCode;
     }
 
-    public <T extends Object> Object toSerializable(){
-        InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
-                .setTerm(this.getTerm())
-                .setLeaderId(this.getLeaderId())
-                .setChunkIndex(this.getChunkIndex())
-                .setData(this.getData())
-                .setLastIncludedIndex(this.getLastIncludedIndex())
-                .setLastIncludedTerm(this.getLastIncludedTerm())
-                .setTotalChunks(this.getTotalChunks());
-
-        if(lastChunkHashCode.isPresent()){
-            builder.setLastChunkHashCode(lastChunkHashCode.get());
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(RaftVersions.CURRENT_VERSION);
+        out.writeLong(getTerm());
+        out.writeUTF(leaderId);
+        out.writeLong(lastIncludedIndex);
+        out.writeLong(lastIncludedTerm);
+        out.writeInt(chunkIndex);
+        out.writeInt(totalChunks);
+
+        out.writeByte(lastChunkHashCode.isPresent() ? 1 : 0);
+        if(lastChunkHashCode.isPresent()) {
+            out.writeInt(lastChunkHashCode.get().intValue());
         }
-        return builder.build();
-    }
 
-    public static InstallSnapshot fromSerializable (Object o) {
-        InstallSnapshotMessages.InstallSnapshot from =
-            (InstallSnapshotMessages.InstallSnapshot) o;
+        out.writeObject(data);
+    }
 
-        Optional<Integer> lastChunkHashCode = Optional.absent();
-        if(from.hasLastChunkHashCode()){
-            lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort(); // raft version - not currently used
+        setTerm(in.readLong());
+        leaderId = in.readUTF();
+        lastIncludedIndex = in.readLong();
+        lastIncludedTerm = in.readLong();
+        chunkIndex = in.readInt();
+        totalChunks = in.readInt();
+
+        lastChunkHashCode = Optional.absent();
+        boolean chunkHashCodePresent = in.readByte() == 1;
+        if(chunkHashCodePresent) {
+            lastChunkHashCode = Optional.of(in.readInt());
         }
 
-        InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
-            from.getLeaderId(), from.getLastIncludedIndex(),
-            from.getLastIncludedTerm(), from.getData(),
-            from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
+        data = (byte[])in.readObject();
+    }
 
-        return installSnapshot;
+    public <T extends Object> Object toSerializable(short version) {
+        if(version >= RaftVersions.CURRENT_VERSION) {
+            return this;
+        } else {
+            InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+                    .setTerm(this.getTerm())
+                    .setLeaderId(this.getLeaderId())
+                    .setChunkIndex(this.getChunkIndex())
+                    .setData(ByteString.copyFrom(getData()))
+                    .setLastIncludedIndex(this.getLastIncludedIndex())
+                    .setLastIncludedTerm(this.getLastIncludedTerm())
+                    .setTotalChunks(this.getTotalChunks());
+
+            if(lastChunkHashCode.isPresent()){
+                builder.setLastChunkHashCode(lastChunkHashCode.get());
+            }
+            return builder.build();
+        }
     }
 
     @Override
     public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("InstallSnapshot [term=").append(term).append(", leaderId=").append(leaderId)
-                .append(", lastIncludedIndex=").append(lastIncludedIndex).append(", lastIncludedTerm=")
-                .append(lastIncludedTerm).append(", data=").append(data).append(", chunkIndex=").append(chunkIndex)
-                .append(", totalChunks=").append(totalChunks).append(", lastChunkHashCode=").append(lastChunkHashCode)
-                .append("]");
-        return builder.toString();
+        return "InstallSnapshot [term=" + getTerm() + ", leaderId=" + leaderId + ", lastIncludedIndex="
+                + lastIncludedIndex + ", lastIncludedTerm=" + lastIncludedTerm + ", datasize=" + data.length
+                + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode + "]";
+    }
+
+    public static InstallSnapshot fromSerializable (Object o) {
+        if(o instanceof InstallSnapshot) {
+            return (InstallSnapshot)o;
+        } else {
+            InstallSnapshotMessages.InstallSnapshot from =
+                    (InstallSnapshotMessages.InstallSnapshot) o;
+
+            Optional<Integer> lastChunkHashCode = Optional.absent();
+            if(from.hasLastChunkHashCode()){
+                lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+            }
+
+            InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+                    from.getLeaderId(), from.getLastIncludedIndex(),
+                    from.getLastIncludedTerm(), from.getData().toByteArray(),
+                    from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
+
+            return installSnapshot;
+        }
+    }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof InstallSnapshot || message instanceof InstallSnapshotMessages.InstallSnapshot;
     }
 }
index bf55fa7aca01e1612e1384e007c34c4582da2308..8536528c74ce46719cddd92fff966c7d187ad7d1 100644 (file)
@@ -158,6 +158,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
     }
 
+    protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+
     protected final Logger testLog = LoggerFactory.getLogger(getClass());
 
     protected final TestActorFactory factory = new TestActorFactory(getSystem());
@@ -204,6 +206,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         configParams.setSnapshotBatchCount(snapshotBatchCount);
         configParams.setSnapshotDataThresholdPercentage(70);
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
         return configParams;
     }
 
index 89d69886edd292e80eb08c9851d40a7313d29b40..03387abcd5cca234649b9f58796a86992e9b8b00 100644 (file)
@@ -498,27 +498,52 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         List<ReplicatedLogEntry> unAppliedEntry;
         ApplySnapshot applySnapshot;
         InstallSnapshot installSnapshot;
-        InstallSnapshotReply installSnapshotReply;
 
         testLog.info("testInstallSnapshotToLaggingFollower starting");
 
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+
         // Now stop dropping AppendEntries in follower 2.
         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
 
+
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
+        // the snapshot store because the second snapshot was initiated by the follower install snapshot and
+        // not because the batch count was reached so the persisted journal sequence number wasn't advanced
+        // far enough to cause the previous snapshot to be deleted. This is because
+        // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
+        // This is OK - the next snapshot should delete it. In production, even if the system restarted
+        // before another snapshot, they would both get applied which wouldn't hurt anything.
+        persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+        Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
+        Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
+        unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
+        assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
+
+        int snapshotSize = persistedSnapshot.getState().length;
+        int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
+
         installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
         assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
         assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
         assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
-        assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
+        assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
         assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
         //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
 
-        installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
-        assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
-        assertEquals("InstallSnapshotReply getChunkIndex", 1, installSnapshotReply.getChunkIndex());
-        assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
-        assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+        List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
+                leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
+        int index = 1;
+        for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
+            assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
+            assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
+            assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
+            assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+        }
 
         // Verify follower 2 applies the snapshot.
         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
@@ -536,20 +561,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // the log. In addition replicatedToAllIndex should've advanced.
         verifyLeadersTrimmedLog(lastAppliedIndex);
 
-        // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
-        // the snapshot store because the second snapshot was initiated by the follower install snapshot and
-        // not because the batch count was reached so the persisted journal sequence number wasn't advanced
-        // far enough to cause the previous snapshot to be deleted. This is because
-        // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
-        // This is OK - the next snapshot should delete it. In production, even if the system restarted
-        // before another snapshot, they would both get applied which wouldn't hurt anything.
-        persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
-        Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
-        Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
-        unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
-        assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
-
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
index 1b15ecb135a89f87c212ecf3a080cf8181ba921e..537101e8dc013ac4bc5e5254f1908995b19433e1 100644 (file)
@@ -763,7 +763,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         InstallSnapshot lastInstallSnapshot = null;
 
         for(int i = 0; i < totalChunks; i++) {
-            ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+            byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
                     chunkData, chunkIndex, totalChunks);
             follower.handleMessage(leaderActor, lastInstallSnapshot);
@@ -830,7 +830,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertTrue(totalChunks > 1);
 
         // Send an install snapshot with the first chunk to start the process of installing a snapshot
-        ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+        byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
                 chunkData, 1, totalChunks));
 
@@ -871,7 +871,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         InstallSnapshot lastInstallSnapshot = null;
 
         for(int i = 0; i < totalChunks; i++) {
-            ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+            byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
                     chunkData, chunkIndex, totalChunks);
             follower.handleMessage(leaderActor, lastInstallSnapshot);
@@ -983,7 +983,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
     }
 
-    public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
+    public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
         int snapshotLength = bs.size();
         int start = offset;
         int size = chunkSize;
@@ -994,7 +994,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 size = snapshotLength - start;
             }
         }
-        return bs.substring(start, start + size);
+
+        byte[] nextChunk = new byte[size];
+        bs.copyTo(nextChunk, start, 0, size);
+        return nextChunk;
     }
 
     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
index 50fd2fe358303a1a8f3863548b98eefe1c6b2529..cbbcdc9063dddea1e1890fe34bb127d21106dc12 100644 (file)
@@ -23,6 +23,7 @@ import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -37,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
@@ -1125,7 +1127,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(3, installSnapshot.getTotalChunks());
         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
 
-        int hashCode = installSnapshot.getData().hashCode();
+        int hashCode = Arrays.hashCode(installSnapshot.getData());
 
         followerActor.underlyingActor().clear();
 
@@ -1176,8 +1178,8 @@ public class LeaderTest extends AbstractLeaderTest {
                 j = barray.length;
             }
 
-            ByteString chunk = fts.getNextChunk();
-            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+            byte[] chunk = fts.getNextChunk();
+            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
 
             fts.markSendStatus(true);
@@ -1658,7 +1660,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+        assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
 
         short payloadVersion = 5;
         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
@@ -1685,8 +1690,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
 
-        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals(2, followerInfo.getMatchIndex());
+        assertEquals(3, followerInfo.getNextIndex());
         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+        assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
     }
 
     @Test
index 80348263c86c9626a96765afe5953f2820c26e9d..c7d98b083cd1cc85323eb5f72a489d8cdfdfda49 100644 (file)
@@ -14,6 +14,7 @@ import com.google.protobuf.ByteString;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import org.junit.Assert;
@@ -28,9 +29,9 @@ public class SnapshotTrackerTest {
 
     Map<String, String> data;
     ByteString byteString;
-    ByteString chunk1;
-    ByteString chunk2;
-    ByteString chunk3;
+    byte[] chunk1;
+    byte[] chunk2;
+    byte[] chunk3;
 
     @Before
     public void setup(){
@@ -128,9 +129,9 @@ public class SnapshotTrackerTest {
 
         SnapshotTracker tracker2 = new SnapshotTracker(logger, 3);
 
-        tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
-        tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
-        tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+        tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+        tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
+        tracker2.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
 
         byte[] snapshot = tracker2.getSnapshot();
 
@@ -141,15 +142,15 @@ public class SnapshotTrackerTest {
     public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
         SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
 
-        ByteString chunks = chunk1.concat(chunk2);
+        ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2));
 
-        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
-        tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+        tracker1.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+        tracker1.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
 
         assertEquals(chunks, tracker1.getCollectedChunks());
     }
 
-    public ByteString getNextChunk (ByteString bs, int offset, int size){
+    public byte[] getNextChunk (ByteString bs, int offset, int size){
         int snapshotLength = bs.size();
         int start = offset;
         if (size > snapshotLength) {
@@ -159,7 +160,10 @@ public class SnapshotTrackerTest {
                 size = snapshotLength - start;
             }
         }
-        return bs.substring(start, start + size);
+
+        byte[] nextChunk = new byte[size];
+        bs.copyTo(nextChunk, start, 0, size);
+        return nextChunk;
     }
 
     private static ByteString toByteString(Map<String, String> state) {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotTest.java
new file mode 100644 (file)
index 0000000..1751f71
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.messages;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import com.google.common.base.Optional;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+
+/**
+ * Unit tests for InstallSnapshot.
+ *
+ * @author Thomas Pantelis
+ */
+public class InstallSnapshotTest {
+
+    @Test
+    public void testSerialization() {
+        byte[] data = new byte[1000];
+        int j = 0;
+        for(int i = 0; i < data.length; i++) {
+            data[i] = (byte)j;
+            if(++j >= 255) {
+                j = 0;
+            }
+        }
+
+        InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+
+        Object serialized = expected.toSerializable(RaftVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", InstallSnapshot.class, serialized.getClass());
+
+        InstallSnapshot actual = InstallSnapshot.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        verifyInstallSnapshot(expected, actual);
+
+        expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6);
+        actual = InstallSnapshot.fromSerializable(SerializationUtils.clone(
+                (Serializable) expected.toSerializable(RaftVersions.CURRENT_VERSION)));
+        verifyInstallSnapshot(expected, actual);
+    }
+
+    @Test
+    public void testSerializationWithPreBeSR3Version() {
+        byte[] data = {0,1,2,3,4,5,7,8,9};
+        InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+
+        Object serialized = expected.toSerializable(RaftVersions.LITHIUM_VERSION);
+        assertEquals("Serialized type", InstallSnapshot.SERIALIZABLE_CLASS, serialized.getClass());
+
+        InstallSnapshot actual = InstallSnapshot.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        verifyInstallSnapshot(expected, actual);
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, InstallSnapshot.isSerializedType(
+                InstallSnapshotMessages.InstallSnapshot.newBuilder().build()));
+        assertEquals("isSerializedType", true, InstallSnapshot.isSerializedType(new InstallSnapshot()));
+        assertEquals("isSerializedType", false, InstallSnapshot.isSerializedType(new Object()));
+    }
+
+    private void verifyInstallSnapshot(InstallSnapshot expected, InstallSnapshot actual) {
+        assertEquals("getTerm", expected.getTerm(), actual.getTerm());
+        assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex());
+        assertEquals("getTotalChunks", expected.getTotalChunks(), actual.getTotalChunks());
+        assertEquals("getLastIncludedTerm", expected.getLastIncludedTerm(), actual.getLastIncludedTerm());
+        assertEquals("getLastIncludedIndex", expected.getLastIncludedIndex(), actual.getLastIncludedIndex());
+        assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
+        assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex());
+        assertArrayEquals("getData", expected.getData(), actual.getData());
+        assertEquals("getLastChunkHashCode present", expected.getLastChunkHashCode().isPresent(),
+                actual.getLastChunkHashCode().isPresent());
+        if(expected.getLastChunkHashCode().isPresent()) {
+            assertEquals("getLastChunkHashCode", expected.getLastChunkHashCode().get(),
+                    actual.getLastChunkHashCode().get());
+        }
+    }
+}
index 9915911f784cb9b13523962207215cecdcca77ef..8552216566e729d5786f3c3502eb5139a9f5d600 100644 (file)
@@ -46,7 +46,7 @@ public class DummyShard extends UntypedActor{
         } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
             AppendEntries req = AppendEntries.fromSerializable(o);
             handleAppendEntries(req);
-        } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
+        } else if(InstallSnapshot.isSerializedType(o)) {
             InstallSnapshot req = InstallSnapshot.fromSerializable(o);
             handleInstallSnapshot(req);
         } else if(o instanceof InstallSnapshot){