Bug 3020: Add version to AppendEntries and AppendEntriesReply 24/20524/2
authorTom Pantelis <tpanteli@brocade.com>
Fri, 24 Apr 2015 15:17:27 +0000 (11:17 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 16 May 2015 02:22:00 +0000 (02:22 +0000)
To handle backwards compatibility, a leader needs to know the version of
its followers wrt to the derived RaftActor's payload data. This will
enable
the derived RaftActor to translate its payload data to an older version.
So I added a version field to AppendEntriesReply which the leader stores
in the FollowerLogInformation.

In addition, a follower needs to know the version of its leader so we
can handle backwards compatibility wrt to transactions since we no
longer send the CreateTransaction message to the leader (currently only
for write-only). This patch adds a version field to AppendEntries - a
subsequent patch will utilize it.

Change-Id: I41632024a270206153e7c5d363ee1c79800e4200
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 655216a6c75aa29d31c4c56c56a5000db56ba233)

26 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java
opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java

index 5ab3f69bea994d60644cca47b9694205ae2ab039..aa2b26d9c8e6d456c0d0ea8964da24d519be2cb4 100644 (file)
@@ -47,7 +47,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
 
     public ExampleActor(String id, Map<String, String> peerAddresses,
         Optional<ConfigParams> configParams) {
 
     public ExampleActor(String id, Map<String, String> peerAddresses,
         Optional<ConfigParams> configParams) {
-        super(id, peerAddresses, configParams);
+        super(id, peerAddresses, configParams, (short)0);
         setPersistence(true);
         roleChangeNotifier = createRoleChangeNotifier(id);
     }
         setPersistence(true);
         roleChangeNotifier = createRoleChangeNotifier(id);
     }
index 07b6b617aaa862b472fb479be247479e1096f434..c5524bc1673801680cb002757b6b8edc004bb629 100644 (file)
@@ -97,4 +97,14 @@ public interface FollowerLogInformation {
      * @return true if it is ok to replicate
      */
     boolean okToReplicate();
      * @return true if it is ok to replicate
      */
     boolean okToReplicate();
+
+    /**
+     * Returns the payload data version of the follower.
+     */
+    short getPayloadVersion();
+
+    /**
+     * Sets the payload data version of the follower.
+     */
+    void setPayloadVersion(short payloadVersion);
 }
 }
index bcfd472bf6c394ab88b5e10ced1a0263be01e9f6..5525d75b7dcf3b6ec2d2ddcaca32f3b59d518d70 100644 (file)
@@ -26,6 +26,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
 
 
     private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
 
+    private short payloadVersion = -1;
 
     public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
 
     public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
@@ -143,4 +144,14 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
                 .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
         return builder.toString();
     }
                 .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
         return builder.toString();
     }
+
+    @Override
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
+    @Override
+    public void setPayloadVersion(short payloadVersion) {
+        this.payloadVersion = payloadVersion;
+    }
 }
 }
index b1ae4587c44eae1e6f85f331d133d41e7c721bea..5dc1b9dcdf8877132b4978f1cde7f47da32e3bc7 100644 (file)
@@ -114,12 +114,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
 
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
-    public RaftActor(String id, Map<String, String> peerAddresses) {
-        this(id, peerAddresses, Optional.<ConfigParams>absent());
-    }
-
     public RaftActor(String id, Map<String, String> peerAddresses,
     public RaftActor(String id, Map<String, String> peerAddresses,
-         Optional<ConfigParams> configParams) {
+         Optional<ConfigParams> configParams, short payloadVersion) {
 
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
 
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
@@ -127,6 +123,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
             delegatingPersistenceProvider, LOG);
 
             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
             delegatingPersistenceProvider, LOG);
 
+        context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
     }
 
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
     }
 
index 7198876ca685fbab60c7983df909c0f05551a0f9..bdb1cd93c6c076a6d1ad1f141330aa2156ab2539 100644 (file)
@@ -180,4 +180,5 @@ public interface RaftActorContext {
     @VisibleForTesting
     void setTotalMemoryRetriever(Supplier<Long> retriever);
 
     @VisibleForTesting
     void setTotalMemoryRetriever(Supplier<Long> retriever);
 
+    short getPayloadVersion();
 }
 }
index 049b91c416a5ea9fc138a6d094c25f4cc23a99c1..8fa579a5e7675e17f287c0d82a36a6567e81f983 100644 (file)
@@ -51,6 +51,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final DataPersistenceProvider persistenceProvider;
 
 
     private final DataPersistenceProvider persistenceProvider;
 
+    private short payloadVersion;
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
@@ -66,6 +68,15 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.LOG = logger;
     }
 
         this.LOG = logger;
     }
 
+    void setPayloadVersion(short payloadVersion) {
+        this.payloadVersion = payloadVersion;
+    }
+
+    @Override
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
     void setConfigParams(ConfigParams configParams) {
         this.configParams = configParams;
     }
     void setConfigParams(ConfigParams configParams) {
         this.configParams = configParams;
     }
index bdfdd9b3765c576495e5bbf96dcdb19958c73cd5..c2d9edd93ab6a7fe7e5bab03a16d8a993ce5c90c 100644 (file)
@@ -178,6 +178,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
 
         followerLogInformation.markFollowerActive();
         }
 
         followerLogInformation.markFollowerActive();
+        followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
 
         boolean updated = false;
         if (appendEntriesReply.isSuccess()) {
 
         boolean updated = false;
         if (appendEntriesReply.isSuccess()) {
@@ -526,7 +527,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             prevLogIndex(followerNextIndex),
             prevLogTerm(followerNextIndex), entries,
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             prevLogIndex(followerNextIndex),
             prevLogTerm(followerNextIndex), entries,
-            context.getCommitIndex(), super.getReplicatedToAllIndex());
+            context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
 
         if(!entries.isEmpty() || LOG.isTraceEnabled()) {
             LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
 
         if(!entries.isEmpty() || LOG.isTraceEnabled()) {
             LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
index c276d32cce33d5b5bfada40f7f62afb6244a2e07..9e4ae6f6a5f6ea912964cec51e060de673e691a7 100644 (file)
@@ -130,7 +130,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
 
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
-                    lastIndex(), lastTerm()), actor()
+                    lastIndex(), lastTerm(), context.getPayloadVersion()), actor()
             );
             return this;
         }
             );
             return this;
         }
index a6722e6ff98dbbe9ab68df6c9e04915c23c8721a..150f300a4680aee0499e962efac85505e671c527 100644 (file)
@@ -104,7 +104,7 @@ public class Follower extends AbstractRaftActorBehavior {
         if (snapshotTracker != null) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
         if (snapshotTracker != null) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                    lastIndex(), lastTerm());
+                    lastIndex(), lastTerm(), context.getPayloadVersion());
 
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
 
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
@@ -168,7 +168,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         logName(), lastIndex, lastTerm());
 
             sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
                         logName(), lastIndex, lastTerm());
 
             sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                    lastTerm()), actor());
+                    lastTerm(), context.getPayloadVersion()), actor());
             return this;
         }
 
             return this;
         }
 
@@ -250,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
         }
 
         AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-            lastIndex, lastTerm());
+            lastIndex, lastTerm(), context.getPayloadVersion());
 
         if(LOG.isTraceEnabled()) {
             LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply);
 
         if(LOG.isTraceEnabled()) {
             LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply);
index d2ea0c50cd6203fd6b142793612e4e0546000185..6c1d6b47c73420b417db651ed667417449e0bbbb 100644 (file)
@@ -53,8 +53,10 @@ public class AppendEntries extends AbstractRaftRPC {
     // index which has been replicated successfully to all followers, -1 if none
     private final long replicatedToAllIndex;
 
     // index which has been replicated successfully to all followers, -1 if none
     private final long replicatedToAllIndex;
 
-    public AppendEntries(long term, String leaderId, long prevLogIndex,
-        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
+    private final short payloadVersion;
+
+    public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm,
+            List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) {
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
@@ -62,6 +64,7 @@ public class AppendEntries extends AbstractRaftRPC {
         this.entries = entries;
         this.leaderCommit = leaderCommit;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.entries = entries;
         this.leaderCommit = leaderCommit;
         this.replicatedToAllIndex = replicatedToAllIndex;
+        this.payloadVersion = payloadVersion;
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -75,7 +78,7 @@ public class AppendEntries extends AbstractRaftRPC {
     }
 
     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
     }
 
     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.readShort(); // version
+        in.readShort(); // raft version
 
         in.defaultReadObject();
 
 
         in.defaultReadObject();
 
@@ -110,14 +113,17 @@ public class AppendEntries extends AbstractRaftRPC {
         return replicatedToAllIndex;
     }
 
         return replicatedToAllIndex;
     }
 
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
 
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
 
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("AppendEntries [term=").append(term).append(", leaderId=").append(leaderId)
-                .append(", prevLogIndex=").append(prevLogIndex).append(", prevLogTerm=").append(prevLogTerm)
-                .append(", entries=").append(entries).append(", leaderCommit=").append(leaderCommit)
-                .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append("]");
+        builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex)
+                .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit)
+                .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=")
+                .append(payloadVersion).append("]");
         return builder.toString();
     }
 
         return builder.toString();
     }
 
@@ -208,7 +214,7 @@ public class AppendEntries extends AbstractRaftRPC {
             from.getPrevLogIndex(),
             from.getPrevLogTerm(),
             logEntryList,
             from.getPrevLogIndex(),
             from.getPrevLogTerm(),
             logEntryList,
-            from.getLeaderCommit(), -1);
+            from.getLeaderCommit(), -1, (short)0);
 
         return to;
     }
 
         return to;
     }
index 01fef006a922a5b69290df2eaf19a4017ead65d9..990605b288201c4d0d3156c65c8077d326d25cdf 100644 (file)
@@ -29,13 +29,17 @@ public class AppendEntriesReply extends AbstractRaftRPC {
     // responding
     private final String followerId;
 
     // responding
     private final String followerId;
 
-    public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) {
+    private final short payloadVersion;
+
+    public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
+            short payloadVersion) {
         super(term);
 
         this.followerId = followerId;
         this.success = success;
         this.logLastIndex = logLastIndex;
         this.logLastTerm = logLastTerm;
         super(term);
 
         this.followerId = followerId;
         this.success = success;
         this.logLastIndex = logLastIndex;
         this.logLastTerm = logLastTerm;
+        this.payloadVersion = payloadVersion;
     }
 
     @Override
     }
 
     @Override
@@ -59,12 +63,16 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         return followerId;
     }
 
         return followerId;
     }
 
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("AppendEntriesReply [term=").append(term).append(", success=").append(success)
-                .append(", logLastIndex=").append(logLastIndex).append(", logLastTerm=").append(logLastTerm)
-                .append(", followerId=").append(followerId).append("]");
+        builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex)
+                .append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId)
+                .append(", payloadVersion=").append(payloadVersion).append("]");
         return builder.toString();
     }
 }
         return builder.toString();
     }
 }
index 586ca8cda05fac488d448949c0840ead847432df..a6853caf9ea78fbbd45421a049858ec948e24678 100644 (file)
@@ -70,7 +70,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
 
     public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
                          DataPersistenceProvider dataPersistenceProvider) {
 
     public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
                          DataPersistenceProvider dataPersistenceProvider) {
-        super(id, peerAddresses, config);
+        super(id, peerAddresses, config, (short) 0);
         state = new ArrayList<>();
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
         state = new ArrayList<>();
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
index 4aa3b2fb4e62349f8c96b06e235b5ea8e7378bcf..01ff6ce14f4eb44930f13dc5661ad7c6ab9c82fc 100644 (file)
@@ -41,6 +41,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private boolean snapshotCaptureInitiated;
     private SnapshotManager snapshotManager;
     private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
     private boolean snapshotCaptureInitiated;
     private SnapshotManager snapshotManager;
     private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
+    private short payloadVersion;
 
     public MockRaftActorContext(){
         electionTerm = new ElectionTerm() {
 
     public MockRaftActorContext(){
         electionTerm = new ElectionTerm() {
@@ -232,6 +233,15 @@ public class MockRaftActorContext implements RaftActorContext {
         this.persistenceProvider = persistenceProvider;
     }
 
         this.persistenceProvider = persistenceProvider;
     }
 
+    @Override
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
+    public void setPayloadVersion(short payloadVersion) {
+        this.payloadVersion = payloadVersion;
+    }
+
     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
index bdb1d6052c852edfee33ac2ca5a20b1e43bdabb4..cca9a2508e5caae49d6fd97086da5e037a46113d 100644 (file)
@@ -547,13 +547,13 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
                 //fake snapshot on index 5
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
                 //fake snapshot on index 5
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 //fake snapshot on index 6
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 //fake snapshot on index 6
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
@@ -584,7 +584,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
 
                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
 
                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
                 assertEquals(2, leaderActor.getReplicatedLog().size());
                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
 
                 assertEquals(2, leaderActor.getReplicatedLog().size());
                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
 
@@ -650,7 +650,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                         new MockRaftActorContext.MockPayload("foo-6"))
                         );
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                         new MockRaftActorContext.MockPayload("foo-6"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
                 assertEquals(7, followerActor.getReplicatedLog().size());
 
                 //fake snapshot on index 7
                 assertEquals(7, followerActor.getReplicatedLog().size());
 
                 //fake snapshot on index 7
@@ -661,7 +661,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short)0));
                 assertEquals(8, followerActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
                 assertEquals(8, followerActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
@@ -689,7 +689,7 @@ public class RaftActorTest extends AbstractActorTest {
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
                 // send an additional entry 8 with leaderCommit = 7
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
                 // send an additional entry 8 with leaderCommit = 7
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short)0));
 
                 // 7 and 8, as lastapplied is 7
                 assertEquals(2, followerActor.getReplicatedLog().size());
 
                 // 7 and 8, as lastapplied is 7
                 assertEquals(2, followerActor.getReplicatedLog().size());
@@ -747,12 +747,12 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short)0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // set the 2nd follower nextIndex to 1 which has been snapshotted
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // set the 2nd follower nextIndex to 1 which has been snapshotted
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
@@ -765,7 +765,7 @@ public class RaftActorTest extends AbstractActorTest {
 
 
                 //reply from a slow follower does not initiate a fake snapshot
 
 
                 //reply from a slow follower does not initiate a fake snapshot
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
 
                 ByteString snapshotBytes = fromObject(Arrays.asList(
                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
 
                 ByteString snapshotBytes = fromObject(Arrays.asList(
@@ -780,7 +780,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short)0));
                 assertEquals(0, leaderActor.getReplicatedLog().size());
             }
         };
                 assertEquals(0, leaderActor.getReplicatedLog().size());
             }
         };
index f56755b447d347981b687cf017eded1caa059ea6..d3f5a0eead1ff3ccdd2e69f215eccfb6886143b4 100644 (file)
@@ -81,28 +81,31 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
      */
     @Test
     public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
      */
     @Test
     public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
-            MockRaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
+        short payloadVersion = 5;
+        context.setPayloadVersion(payloadVersion);
 
 
-            // First set the receivers term to a high number (1000)
-            context.getTermInformation().update(1000, "test");
+        // First set the receivers term to a high number (1000)
+        context.getTermInformation().update(1000, "test");
 
 
-            AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
+        AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1, (short)4);
 
 
-            behavior = createBehavior(context);
+        behavior = createBehavior(context);
 
 
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+        // Send an unknown message so that the state of the RaftActor remains unchanged
+        RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
 
 
-            RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
+        RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
 
 
-            assertEquals("Raft state", expected.state(), raftBehavior.state());
+        assertEquals("Raft state", expected.state(), raftBehavior.state());
 
 
-            // Also expect an AppendEntriesReply to be sent where success is false
+        // Also expect an AppendEntriesReply to be sent where success is false
 
 
-            AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
-                    behaviorActor, AppendEntriesReply.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
+                behaviorActor, AppendEntriesReply.class);
 
 
-            assertEquals("isSuccess", false, reply.isSuccess());
+        assertEquals("isSuccess", false, reply.isSuccess());
+        assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
     }
 
 
     }
 
 
@@ -119,7 +122,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
 
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
 
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
 
         behavior = createBehavior(context);
 
 
         behavior = createBehavior(context);
 
@@ -314,11 +317,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
     protected AppendEntries createAppendEntriesWithNewerTerm() {
     }
 
     protected AppendEntries createAppendEntriesWithNewerTerm() {
-        return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
+        return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1, (short)0);
     }
 
     protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
     }
 
     protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
-        return new AppendEntriesReply("follower-1", 100, false, 100, 100);
+        return new AppendEntriesReply("follower-1", 100, false, 100, 100, (short)0);
     }
 
     protected RequestVote createRequestVoteWithNewerTerm() {
     }
 
     protected RequestVote createRequestVoteWithNewerTerm() {
index 63fd530675a78e9f4ec0c35c67e394e210ae9fd3..2dc2b21c166eff2cf358df3ad3cd45b6beb6fab1 100644 (file)
@@ -127,7 +127,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
         setupPeers(1);
         candidate.handleMessage(peerActors[0], new AppendEntries(1, "test", 0, 0,
 
         setupPeers(1);
         candidate.handleMessage(peerActors[0], new AppendEntries(1, "test", 0, 0,
-                Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
+                Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)0));
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
                 peerActors[0], AppendEntriesReply.class);
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
                 peerActors[0], AppendEntriesReply.class);
index c9cec158376faa6484f6c498bdaf41252875e411..443ebc31a4ae49380f92d3bfa341879882d137e5 100644 (file)
@@ -48,6 +48,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private RaftActorBehavior follower;
 
 
     private RaftActorBehavior follower;
 
+    private final short payloadVersion = 5;
+
     @Override
     @After
     public void tearDown() throws Exception {
     @Override
     @After
     public void tearDown() throws Exception {
@@ -70,7 +72,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     @Override
     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
 
     @Override
     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
-        return new MockRaftActorContext("follower", getSystem(), actorRef);
+        MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
+        context.setPayloadVersion(payloadVersion );
+        return context;
     }
 
     @Test
     }
 
     @Test
@@ -139,7 +143,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -159,7 +163,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -180,7 +184,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -209,7 +213,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -229,7 +233,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // leader-2 is becoming the leader now and it says the commitIndex is 45
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // leader-2 is becoming the leader now and it says the commitIndex is 45
-        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -250,7 +254,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -271,7 +275,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -289,7 +293,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // leader-2 is becoming the leader now and it says the commitIndex is 45
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // leader-2 is becoming the leader now and it says the commitIndex is 45
-        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -323,7 +327,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -349,7 +353,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         // AppendEntries is now sent with a bigger term
         // this will set the receivers term to be the same as the sender's term
 
         // AppendEntries is now sent with a bigger term
         // this will set the receivers term to be the same as the sender's term
-        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
+        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
 
         follower = createBehavior(context);
 
 
         follower = createBehavior(context);
 
@@ -397,7 +401,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
-        AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
+        AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1, (short)0);
 
         follower = createBehavior(context);
 
 
         follower = createBehavior(context);
 
@@ -444,7 +448,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
-        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
 
         follower = createBehavior(context);
 
 
         follower = createBehavior(context);
 
@@ -487,7 +491,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
 
         follower = createBehavior(context);
 
 
         follower = createBehavior(context);
 
@@ -518,7 +522,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         follower = createBehavior(context);
 
 
         follower = createBehavior(context);
 
-        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
 
         assertEquals("Next index", 2, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
 
         assertEquals("Next index", 2, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
@@ -530,7 +534,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
 
         leaderActor.underlyingActor().clear();
         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
 
         leaderActor.underlyingActor().clear();
-        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
 
         assertEquals("Next index", 3, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
 
         assertEquals("Next index", 3, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
@@ -558,7 +562,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
 
         follower = createBehavior(context);
 
 
         follower = createBehavior(context);
 
@@ -725,7 +729,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -798,6 +802,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
+        assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
     }
 
     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
     }
 
     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
index e16d765cdea29a76b0a18440a3128c1a4f2529e6..0eb58d427afa33c62a8c73c4e77b3aeb21db84a4 100644 (file)
@@ -78,7 +78,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
         // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
         // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1));
+                        isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0));
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
 
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
 
@@ -87,7 +87,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
     }
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
     }
@@ -113,13 +113,13 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
         // in a 5 member cluster, atleast 2 followers need to be active and return a reply
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
         // in a 5 member cluster, atleast 2 followers need to be active and return a reply
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
 
         assertEquals("Raft state", RaftState.IsolatedLeader, behavior.state());
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
 
         assertEquals("Raft state", RaftState.IsolatedLeader, behavior.state());
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
 
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
 
@@ -128,7 +128,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true,
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
     }
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
     }
@@ -152,7 +152,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
         // bowing itself to another leader in the cluster
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true,
         // bowing itself to another leader in the cluster
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true,
-                        isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1));
+                        isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1, (short)0));
 
         assertEquals("Raft state", RaftState.Follower, behavior.state());
 
 
         assertEquals("Raft state", RaftState.Follower, behavior.state());
 
index 0255020328655dfe0dee151207f66f9a51eddcc5..9094a656540da3fd422e8a445288339a1ce69c8a 100644 (file)
@@ -85,6 +85,8 @@ public class LeaderTest extends AbstractLeaderTest {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        short payloadVersion = (short)5;
+        actorContext.setPayloadVersion(payloadVersion);
 
         long term = 1;
         actorContext.getTermInformation().update(term, "");
 
         long term = 1;
         actorContext.getTermInformation().update(term, "");
@@ -98,10 +100,11 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
         assertEquals("Entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
         assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
 
         // The follower would normally reply - simulate that explicitly here.
         leader.handleMessage(followerActor, new AppendEntriesReply(
 
         // The follower would normally reply - simulate that explicitly here.
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex - 1, term));
+                FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -118,6 +121,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
     }
 
 
     }
 
 
@@ -146,7 +150,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -192,7 +196,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -232,7 +236,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -240,7 +244,7 @@ public class LeaderTest extends AbstractLeaderTest {
         for(int i=0;i<3;i++) {
             sendReplicate(actorContext, lastIndex+i+1);
             leader.handleMessage(followerActor, new AppendEntriesReply(
         for(int i=0;i<3;i++) {
             sendReplicate(actorContext, lastIndex+i+1);
             leader.handleMessage(followerActor, new AppendEntriesReply(
-                    FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+                    FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
 
         }
 
 
         }
 
@@ -282,7 +286,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -327,7 +331,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -364,7 +368,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -1105,12 +1109,12 @@ public class LeaderTest extends AbstractLeaderTest {
         leader = new Leader(leaderActorContext);
 
         // Send initial heartbeat reply with last index.
         leader = new Leader(leaderActorContext);
 
         // Send initial heartbeat reply with last index.
-        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
+        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0));
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
 
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -1134,7 +1138,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
 
         leader = new Leader(leaderActorContext);
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
+        short payloadVersion = 5;
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -1157,6 +1162,9 @@ public class LeaderTest extends AbstractLeaderTest {
         ApplyState applyState = applyStateList.get(0);
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
         ApplyState applyState = applyStateList.get(0);
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals(payloadVersion, followerInfo.getPayloadVersion());
     }
 
     @Test
     }
 
     @Test
@@ -1167,7 +1175,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
 
         leader = new Leader(leaderActorContext);
 
-        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
+        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -1373,7 +1381,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
             for(int i=1;i<6;i++) {
                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
 
             for(int i=1;i<6;i++) {
                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
-                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
                 assertTrue(newBehavior == leader);
                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
             }
                 assertTrue(newBehavior == leader);
                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
             }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java
new file mode 100644 (file)
index 0000000..0f1bee5
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.messages;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for AppendEntriesReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class AppendEntriesReplyTest {
+
+    @Test
+    public void testSerialization() {
+        AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6);
+        AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
+
+        assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
+        assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId());
+        assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
+        assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
+        assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
+    }
+}
index 5f5d73dbe6b126028fdce788d29f3a8cf7cc75c3..286cd3c9c4e0abec4108f4113fd27f5ea5b62e1e 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft.messages;
 
  */
 package org.opendaylight.controller.cluster.raft.messages;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -19,9 +21,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
 /**
  * Unit tests for AppendEntries.
  *
 /**
  * Unit tests for AppendEntries.
  *
@@ -35,7 +34,9 @@ public class AppendEntriesTest {
 
         ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
 
 
         ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
 
-        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
+        short payloadVersion = 5;
+        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+                -1, payloadVersion);
 
         AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
 
 
         AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
 
@@ -45,7 +46,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromSerializable() {
         AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
     @Test
     public void testToAndFromSerializable() {
         AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
-                Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
+                Collections.<ReplicatedLogEntry>emptyList(), 10L, -1, (short)0);
 
         assertSame("toSerializable", entries, entries.toSerializable());
         assertSame("fromSerializable", entries,
 
         assertSame("toSerializable", entries, entries.toSerializable());
         assertSame("fromSerializable", entries,
@@ -55,7 +56,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromLegacySerializable() {
         ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
     @Test
     public void testToAndFromLegacySerializable() {
         ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
-        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
+        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1, (short)0);
 
         Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
         Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
 
         Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
         Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
@@ -73,6 +74,7 @@ public class AppendEntriesTest {
         assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
         assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
         assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
         assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
         assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
         assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
+        assertEquals("getPayloadVersion", expected.getPayloadVersion(), actual.getPayloadVersion());
 
         assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
         Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
 
         assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
         Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
index 148fa1881b836252417cdccd92ea5440950f82c1..14a20da247f5138b17e3c30e437bf3c661e5ec25 100644 (file)
@@ -116,7 +116,8 @@ public class Shard extends RaftActor {
 
     protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
 
     protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
+        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
+                DataStoreVersions.CURRENT_VERSION);
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
index ce7d6303ad13985fa4c08b4fd152cdae52cbd360..98f71cad4a9b5915d02cfb1519c1d1c0424d0a7f 100644 (file)
@@ -70,6 +70,6 @@ public class CompositeModificationByteStringPayloadTest {
 
         entries.add(new ReplicatedLogImplEntry(0, 1, payload));
 
 
         entries.add(new ReplicatedLogImplEntry(0, 1, payload));
 
-        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1, (short)0).toSerializable());
     }
 }
     }
 }
index 90b978821f2a10860e6006a4047a97ed7b667fa3..dc2f8c4f4b279dbe1b284082951c0aa7903a94b0 100644 (file)
@@ -55,7 +55,7 @@ public class CompositeModificationPayloadTest {
         });
 
         AppendEntries appendEntries =
         });
 
         AppendEntries appendEntries =
-            new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
+            new AppendEntries(1, "member-1", 0, 100, entries, 1, -1, (short)0);
 
         AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
                 appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
 
         AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
                 appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
index 28fc6b0f57ed950479ba2737a4a2aef26e450475..290e23352cee3995d8391ab8d79aa12adf7b9f49 100644 (file)
@@ -98,7 +98,7 @@ public class Client {
             }
         });
 
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0);
     }
 
     public static AppendEntries keyValueAppendEntries() {
     }
 
     public static AppendEntries keyValueAppendEntries() {
@@ -123,6 +123,6 @@ public class Client {
             }
         });
 
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0);
     }
 }
     }
 }
index 0b72a32f1033884ba983c71b2651a4a88ad6c7b1..9915911f784cb9b13523962207215cecdcca77ef 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
 import akka.japi.Creator;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -101,10 +102,12 @@ public class DummyShard extends UntypedActor{
             if (!ignore) {
                 LOG.info("{} - Randomizing delay : {}", followerId, delay);
                 Thread.sleep(delay);
             if (!ignore) {
                 LOG.info("{} - Randomizing delay : {}", followerId, delay);
                 Thread.sleep(delay);
-                sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+                sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+                        DataStoreVersions.CURRENT_VERSION), self());
             }
         } else {
             }
         } else {
-            sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+            sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+                    DataStoreVersions.CURRENT_VERSION), self());
         }
     }
 
         }
     }