Bug 3020: Add version to AppendEntries and AppendEntriesReply 24/20524/2
authorTom Pantelis <tpanteli@brocade.com>
Fri, 24 Apr 2015 15:17:27 +0000 (11:17 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 16 May 2015 02:22:00 +0000 (02:22 +0000)
To handle backwards compatibility, a leader needs to know the version of
its followers wrt to the derived RaftActor's payload data. This will
enable
the derived RaftActor to translate its payload data to an older version.
So I added a version field to AppendEntriesReply which the leader stores
in the FollowerLogInformation.

In addition, a follower needs to know the version of its leader so we
can handle backwards compatibility wrt to transactions since we no
longer send the CreateTransaction message to the leader (currently only
for write-only). This patch adds a version field to AppendEntries - a
subsequent patch will utilize it.

Change-Id: I41632024a270206153e7c5d363ee1c79800e4200
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 655216a6c75aa29d31c4c56c56a5000db56ba233)

26 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java
opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java

index 5ab3f69..aa2b26d 100644 (file)
@@ -47,7 +47,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
 
     public ExampleActor(String id, Map<String, String> peerAddresses,
         Optional<ConfigParams> configParams) {
-        super(id, peerAddresses, configParams);
+        super(id, peerAddresses, configParams, (short)0);
         setPersistence(true);
         roleChangeNotifier = createRoleChangeNotifier(id);
     }
index 07b6b61..c5524bc 100644 (file)
@@ -97,4 +97,14 @@ public interface FollowerLogInformation {
      * @return true if it is ok to replicate
      */
     boolean okToReplicate();
+
+    /**
+     * Returns the payload data version of the follower.
+     */
+    short getPayloadVersion();
+
+    /**
+     * Sets the payload data version of the follower.
+     */
+    void setPayloadVersion(short payloadVersion);
 }
index bcfd472..5525d75 100644 (file)
@@ -26,6 +26,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
 
+    private short payloadVersion = -1;
 
     public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
@@ -143,4 +144,14 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
                 .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
         return builder.toString();
     }
+
+    @Override
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
+    @Override
+    public void setPayloadVersion(short payloadVersion) {
+        this.payloadVersion = payloadVersion;
+    }
 }
index b1ae458..5dc1b9d 100644 (file)
@@ -114,12 +114,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
-    public RaftActor(String id, Map<String, String> peerAddresses) {
-        this(id, peerAddresses, Optional.<ConfigParams>absent());
-    }
-
     public RaftActor(String id, Map<String, String> peerAddresses,
-         Optional<ConfigParams> configParams) {
+         Optional<ConfigParams> configParams, short payloadVersion) {
 
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
@@ -127,6 +123,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
             delegatingPersistenceProvider, LOG);
 
+        context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
     }
 
index 049b91c..8fa579a 100644 (file)
@@ -51,6 +51,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final DataPersistenceProvider persistenceProvider;
 
+    private short payloadVersion;
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
@@ -66,6 +68,15 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.LOG = logger;
     }
 
+    void setPayloadVersion(short payloadVersion) {
+        this.payloadVersion = payloadVersion;
+    }
+
+    @Override
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
     void setConfigParams(ConfigParams configParams) {
         this.configParams = configParams;
     }
index bdfdd9b..c2d9edd 100644 (file)
@@ -178,6 +178,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
 
         followerLogInformation.markFollowerActive();
+        followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
 
         boolean updated = false;
         if (appendEntriesReply.isSuccess()) {
@@ -526,7 +527,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             prevLogIndex(followerNextIndex),
             prevLogTerm(followerNextIndex), entries,
-            context.getCommitIndex(), super.getReplicatedToAllIndex());
+            context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
 
         if(!entries.isEmpty() || LOG.isTraceEnabled()) {
             LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
index c276d32..9e4ae6f 100644 (file)
@@ -130,7 +130,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
-                    lastIndex(), lastTerm()), actor()
+                    lastIndex(), lastTerm(), context.getPayloadVersion()), actor()
             );
             return this;
         }
index a6722e6..150f300 100644 (file)
@@ -104,7 +104,7 @@ public class Follower extends AbstractRaftActorBehavior {
         if (snapshotTracker != null) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                    lastIndex(), lastTerm());
+                    lastIndex(), lastTerm(), context.getPayloadVersion());
 
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
@@ -168,7 +168,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         logName(), lastIndex, lastTerm());
 
             sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                    lastTerm()), actor());
+                    lastTerm(), context.getPayloadVersion()), actor());
             return this;
         }
 
@@ -250,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-            lastIndex, lastTerm());
+            lastIndex, lastTerm(), context.getPayloadVersion());
 
         if(LOG.isTraceEnabled()) {
             LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply);
index d2ea0c5..6c1d6b4 100644 (file)
@@ -53,8 +53,10 @@ public class AppendEntries extends AbstractRaftRPC {
     // index which has been replicated successfully to all followers, -1 if none
     private final long replicatedToAllIndex;
 
-    public AppendEntries(long term, String leaderId, long prevLogIndex,
-        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
+    private final short payloadVersion;
+
+    public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm,
+            List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) {
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
@@ -62,6 +64,7 @@ public class AppendEntries extends AbstractRaftRPC {
         this.entries = entries;
         this.leaderCommit = leaderCommit;
         this.replicatedToAllIndex = replicatedToAllIndex;
+        this.payloadVersion = payloadVersion;
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -75,7 +78,7 @@ public class AppendEntries extends AbstractRaftRPC {
     }
 
     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.readShort(); // version
+        in.readShort(); // raft version
 
         in.defaultReadObject();
 
@@ -110,14 +113,17 @@ public class AppendEntries extends AbstractRaftRPC {
         return replicatedToAllIndex;
     }
 
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
 
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("AppendEntries [term=").append(term).append(", leaderId=").append(leaderId)
-                .append(", prevLogIndex=").append(prevLogIndex).append(", prevLogTerm=").append(prevLogTerm)
-                .append(", entries=").append(entries).append(", leaderCommit=").append(leaderCommit)
-                .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append("]");
+        builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex)
+                .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit)
+                .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=")
+                .append(payloadVersion).append("]");
         return builder.toString();
     }
 
@@ -208,7 +214,7 @@ public class AppendEntries extends AbstractRaftRPC {
             from.getPrevLogIndex(),
             from.getPrevLogTerm(),
             logEntryList,
-            from.getLeaderCommit(), -1);
+            from.getLeaderCommit(), -1, (short)0);
 
         return to;
     }
index 01fef00..990605b 100644 (file)
@@ -29,13 +29,17 @@ public class AppendEntriesReply extends AbstractRaftRPC {
     // responding
     private final String followerId;
 
-    public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) {
+    private final short payloadVersion;
+
+    public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
+            short payloadVersion) {
         super(term);
 
         this.followerId = followerId;
         this.success = success;
         this.logLastIndex = logLastIndex;
         this.logLastTerm = logLastTerm;
+        this.payloadVersion = payloadVersion;
     }
 
     @Override
@@ -59,12 +63,16 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         return followerId;
     }
 
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("AppendEntriesReply [term=").append(term).append(", success=").append(success)
-                .append(", logLastIndex=").append(logLastIndex).append(", logLastTerm=").append(logLastTerm)
-                .append(", followerId=").append(followerId).append("]");
+        builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex)
+                .append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId)
+                .append(", payloadVersion=").append(payloadVersion).append("]");
         return builder.toString();
     }
 }
index 586ca8c..a6853ca 100644 (file)
@@ -70,7 +70,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
 
     public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
                          DataPersistenceProvider dataPersistenceProvider) {
-        super(id, peerAddresses, config);
+        super(id, peerAddresses, config, (short) 0);
         state = new ArrayList<>();
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
index 4aa3b2f..01ff6ce 100644 (file)
@@ -41,6 +41,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private boolean snapshotCaptureInitiated;
     private SnapshotManager snapshotManager;
     private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
+    private short payloadVersion;
 
     public MockRaftActorContext(){
         electionTerm = new ElectionTerm() {
@@ -232,6 +233,15 @@ public class MockRaftActorContext implements RaftActorContext {
         this.persistenceProvider = persistenceProvider;
     }
 
+    @Override
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
+    public void setPayloadVersion(short payloadVersion) {
+        this.payloadVersion = payloadVersion;
+    }
+
     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
index bdb1d60..cca9a25 100644 (file)
@@ -547,13 +547,13 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
                 //fake snapshot on index 5
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 //fake snapshot on index 6
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
@@ -584,7 +584,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
 
                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
                 assertEquals(2, leaderActor.getReplicatedLog().size());
                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
 
@@ -650,7 +650,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                         new MockRaftActorContext.MockPayload("foo-6"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
                 assertEquals(7, followerActor.getReplicatedLog().size());
 
                 //fake snapshot on index 7
@@ -661,7 +661,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short)0));
                 assertEquals(8, followerActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
@@ -689,7 +689,7 @@ public class RaftActorTest extends AbstractActorTest {
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
                 // send an additional entry 8 with leaderCommit = 7
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short)0));
 
                 // 7 and 8, as lastapplied is 7
                 assertEquals(2, followerActor.getReplicatedLog().size());
@@ -747,12 +747,12 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short)0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // set the 2nd follower nextIndex to 1 which has been snapshotted
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
@@ -765,7 +765,7 @@ public class RaftActorTest extends AbstractActorTest {
 
 
                 //reply from a slow follower does not initiate a fake snapshot
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
 
                 ByteString snapshotBytes = fromObject(Arrays.asList(
@@ -780,7 +780,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short)0));
                 assertEquals(0, leaderActor.getReplicatedLog().size());
             }
         };
index f56755b..d3f5a0e 100644 (file)
@@ -81,28 +81,31 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
      */
     @Test
     public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
-            MockRaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
+        short payloadVersion = 5;
+        context.setPayloadVersion(payloadVersion);
 
-            // First set the receivers term to a high number (1000)
-            context.getTermInformation().update(1000, "test");
+        // First set the receivers term to a high number (1000)
+        context.getTermInformation().update(1000, "test");
 
-            AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
+        AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1, (short)4);
 
-            behavior = createBehavior(context);
+        behavior = createBehavior(context);
 
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+        // Send an unknown message so that the state of the RaftActor remains unchanged
+        RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
 
-            RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
+        RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
 
-            assertEquals("Raft state", expected.state(), raftBehavior.state());
+        assertEquals("Raft state", expected.state(), raftBehavior.state());
 
-            // Also expect an AppendEntriesReply to be sent where success is false
+        // Also expect an AppendEntriesReply to be sent where success is false
 
-            AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
-                    behaviorActor, AppendEntriesReply.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
+                behaviorActor, AppendEntriesReply.class);
 
-            assertEquals("isSuccess", false, reply.isSuccess());
+        assertEquals("isSuccess", false, reply.isSuccess());
+        assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
     }
 
 
@@ -119,7 +122,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
 
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
 
         behavior = createBehavior(context);
 
@@ -314,11 +317,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
     protected AppendEntries createAppendEntriesWithNewerTerm() {
-        return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
+        return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1, (short)0);
     }
 
     protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
-        return new AppendEntriesReply("follower-1", 100, false, 100, 100);
+        return new AppendEntriesReply("follower-1", 100, false, 100, 100, (short)0);
     }
 
     protected RequestVote createRequestVoteWithNewerTerm() {
index 63fd530..2dc2b21 100644 (file)
@@ -127,7 +127,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
         setupPeers(1);
         candidate.handleMessage(peerActors[0], new AppendEntries(1, "test", 0, 0,
-                Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
+                Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)0));
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
                 peerActors[0], AppendEntriesReply.class);
index c9cec15..443ebc3 100644 (file)
@@ -48,6 +48,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private RaftActorBehavior follower;
 
+    private final short payloadVersion = 5;
+
     @Override
     @After
     public void tearDown() throws Exception {
@@ -70,7 +72,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     @Override
     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
-        return new MockRaftActorContext("follower", getSystem(), actorRef);
+        MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
+        context.setPayloadVersion(payloadVersion );
+        return context;
     }
 
     @Test
@@ -139,7 +143,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -159,7 +163,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -180,7 +184,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -209,7 +213,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -229,7 +233,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // leader-2 is becoming the leader now and it says the commitIndex is 45
-        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -250,7 +254,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -271,7 +275,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -289,7 +293,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // leader-2 is becoming the leader now and it says the commitIndex is 45
-        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -323,7 +327,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -349,7 +353,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         // AppendEntries is now sent with a bigger term
         // this will set the receivers term to be the same as the sender's term
-        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
+        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -397,7 +401,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
-        AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
+        AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -444,7 +448,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
-        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -487,7 +491,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -518,7 +522,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         follower = createBehavior(context);
 
-        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
 
         assertEquals("Next index", 2, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
@@ -530,7 +534,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
 
         leaderActor.underlyingActor().clear();
-        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
 
         assertEquals("Next index", 3, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
@@ -558,7 +562,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
 
         follower = createBehavior(context);
 
@@ -725,7 +729,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -798,6 +802,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
+        assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
     }
 
     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
index e16d765..0eb58d4 100644 (file)
@@ -78,7 +78,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
         // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1));
+                        isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0));
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
 
@@ -87,7 +87,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
     }
@@ -113,13 +113,13 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
         // in a 5 member cluster, atleast 2 followers need to be active and return a reply
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
 
         assertEquals("Raft state", RaftState.IsolatedLeader, behavior.state());
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
 
@@ -128,7 +128,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
 
         behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true,
-                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+                        isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
 
         assertEquals("Raft state", RaftState.Leader, behavior.state());
     }
@@ -152,7 +152,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
         // bowing itself to another leader in the cluster
         RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
                 new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true,
-                        isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1));
+                        isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1, (short)0));
 
         assertEquals("Raft state", RaftState.Follower, behavior.state());
 
index 0255020..9094a65 100644 (file)
@@ -85,6 +85,8 @@ public class LeaderTest extends AbstractLeaderTest {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        short payloadVersion = (short)5;
+        actorContext.setPayloadVersion(payloadVersion);
 
         long term = 1;
         actorContext.getTermInformation().update(term, "");
@@ -98,10 +100,11 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
         assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
 
         // The follower would normally reply - simulate that explicitly here.
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex - 1, term));
+                FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -118,6 +121,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
     }
 
 
@@ -146,7 +150,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -192,7 +196,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -232,7 +236,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -240,7 +244,7 @@ public class LeaderTest extends AbstractLeaderTest {
         for(int i=0;i<3;i++) {
             sendReplicate(actorContext, lastIndex+i+1);
             leader.handleMessage(followerActor, new AppendEntriesReply(
-                    FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+                    FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
 
         }
 
@@ -282,7 +286,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -327,7 +331,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -364,7 +368,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // The follower would normally reply - simulate that explicitly here.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
         leader.handleMessage(followerActor, new AppendEntriesReply(
-                FOLLOWER_ID, term, true, lastIndex, term));
+                FOLLOWER_ID, term, true, lastIndex, term, (short)0));
         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
         followerActor.underlyingActor().clear();
@@ -1105,12 +1109,12 @@ public class LeaderTest extends AbstractLeaderTest {
         leader = new Leader(leaderActorContext);
 
         // Send initial heartbeat reply with last index.
-        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
+        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0));
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -1134,7 +1138,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
+        short payloadVersion = 5;
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -1157,6 +1162,9 @@ public class LeaderTest extends AbstractLeaderTest {
         ApplyState applyState = applyStateList.get(0);
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals(payloadVersion, followerInfo.getPayloadVersion());
     }
 
     @Test
@@ -1167,7 +1175,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
-        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
+        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
 
         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
@@ -1373,7 +1381,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
             for(int i=1;i<6;i++) {
                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
-                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
                 assertTrue(newBehavior == leader);
                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
             }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java
new file mode 100644 (file)
index 0000000..0f1bee5
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.messages;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for AppendEntriesReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class AppendEntriesReplyTest {
+
+    @Test
+    public void testSerialization() {
+        AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6);
+        AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
+
+        assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
+        assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId());
+        assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
+        assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
+        assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
+    }
+}
index 5f5d73d..286cd3c 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft.messages;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -19,9 +21,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
 /**
  * Unit tests for AppendEntries.
  *
@@ -35,7 +34,9 @@ public class AppendEntriesTest {
 
         ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
 
-        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
+        short payloadVersion = 5;
+        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+                -1, payloadVersion);
 
         AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
 
@@ -45,7 +46,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromSerializable() {
         AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
-                Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
+                Collections.<ReplicatedLogEntry>emptyList(), 10L, -1, (short)0);
 
         assertSame("toSerializable", entries, entries.toSerializable());
         assertSame("fromSerializable", entries,
@@ -55,7 +56,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromLegacySerializable() {
         ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
-        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
+        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1, (short)0);
 
         Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
         Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
@@ -73,6 +74,7 @@ public class AppendEntriesTest {
         assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
         assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
         assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
+        assertEquals("getPayloadVersion", expected.getPayloadVersion(), actual.getPayloadVersion());
 
         assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
         Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
index 148fa18..14a20da 100644 (file)
@@ -116,7 +116,8 @@ public class Shard extends RaftActor {
 
     protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
+        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
+                DataStoreVersions.CURRENT_VERSION);
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
index ce7d630..98f71ca 100644 (file)
@@ -70,6 +70,6 @@ public class CompositeModificationByteStringPayloadTest {
 
         entries.add(new ReplicatedLogImplEntry(0, 1, payload));
 
-        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1, (short)0).toSerializable());
     }
 }
index 90b9788..dc2f8c4 100644 (file)
@@ -55,7 +55,7 @@ public class CompositeModificationPayloadTest {
         });
 
         AppendEntries appendEntries =
-            new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
+            new AppendEntries(1, "member-1", 0, 100, entries, 1, -1, (short)0);
 
         AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
                 appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
index 28fc6b0..290e233 100644 (file)
@@ -98,7 +98,7 @@ public class Client {
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0);
     }
 
     public static AppendEntries keyValueAppendEntries() {
@@ -123,6 +123,6 @@ public class Client {
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0);
     }
 }
index 0b72a32..9915911 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -101,10 +102,12 @@ public class DummyShard extends UntypedActor{
             if (!ignore) {
                 LOG.info("{} - Randomizing delay : {}", followerId, delay);
                 Thread.sleep(delay);
-                sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+                sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+                        DataStoreVersions.CURRENT_VERSION), self());
             }
         } else {
-            sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+            sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+                    DataStoreVersions.CURRENT_VERSION), self());
         }
     }