From: Tom Pantelis Date: Fri, 24 Apr 2015 15:17:27 +0000 (-0400) Subject: Bug 3020: Add version to AppendEntries and AppendEntriesReply X-Git-Tag: release/beryllium~586 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f12d62d2dc28a883c1f1b38df7d72a9142c2abfb Bug 3020: Add version to AppendEntries and AppendEntriesReply To handle backwards compatibility, a leader needs to know the version of its followers wrt to the derived RaftActor's payload data. This will enable the derived RaftActor to translate its payload data to an older version. So I added a version field to AppendEntriesReply which the leader stores in the FollowerLogInformation. In addition, a follower needs to know the version of its leader so we can handle backwards compatibility wrt to transactions since we no longer send the CreateTransaction message to the leader (currently only for write-only). This patch adds a version field to AppendEntries - a subsequent patch will utilize it. Change-Id: I41632024a270206153e7c5d363ee1c79800e4200 Signed-off-by: Tom Pantelis (cherry picked from commit 655216a6c75aa29d31c4c56c56a5000db56ba233) --- diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 5ab3f69bea..aa2b26d9c8 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -47,7 +47,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, public ExampleActor(String id, Map peerAddresses, Optional configParams) { - super(id, peerAddresses, configParams); + super(id, peerAddresses, configParams, (short)0); setPersistence(true); roleChangeNotifier = createRoleChangeNotifier(id); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 07b6b617aa..c5524bc167 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -97,4 +97,14 @@ public interface FollowerLogInformation { * @return true if it is ok to replicate */ boolean okToReplicate(); + + /** + * Returns the payload data version of the follower. + */ + short getPayloadVersion(); + + /** + * Sets the payload data version of the follower. + */ + void setPayloadVersion(short payloadVersion); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index bcfd472bf6..5525d75b7d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -26,6 +26,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); + private short payloadVersion = -1; public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) { this.id = id; @@ -143,4 +144,14 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]"); return builder.toString(); } + + @Override + public short getPayloadVersion() { + return payloadVersion; + } + + @Override + public void setPayloadVersion(short payloadVersion) { + this.payloadVersion = payloadVersion; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index b1ae4587c4..5dc1b9dcdf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -114,12 +114,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); - public RaftActor(String id, Map peerAddresses) { - this(id, peerAddresses, Optional.absent()); - } - public RaftActor(String id, Map peerAddresses, - Optional configParams) { + Optional configParams, short payloadVersion) { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG), @@ -127,6 +123,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), delegatingPersistenceProvider, LOG); + context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior)); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 7198876ca6..bdb1cd93c6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -180,4 +180,5 @@ public interface RaftActorContext { @VisibleForTesting void setTotalMemoryRetriever(Supplier retriever); + short getPayloadVersion(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 049b91c416..8fa579a5e7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -51,6 +51,8 @@ public class RaftActorContextImpl implements RaftActorContext { private final DataPersistenceProvider persistenceProvider; + private short payloadVersion; + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, long lastApplied, Map peerAddresses, ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) { @@ -66,6 +68,15 @@ public class RaftActorContextImpl implements RaftActorContext { this.LOG = logger; } + void setPayloadVersion(short payloadVersion) { + this.payloadVersion = payloadVersion; + } + + @Override + public short getPayloadVersion() { + return payloadVersion; + } + void setConfigParams(ConfigParams configParams) { this.configParams = configParams; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index bdfdd9b376..c2d9edd93a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -178,6 +178,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } followerLogInformation.markFollowerActive(); + followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); boolean updated = false; if (appendEntriesReply.isSuccess()) { @@ -526,7 +527,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), super.getReplicatedToAllIndex()); + context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion()); if(!entries.isEmpty() || LOG.isTraceEnabled()) { LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId, diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index c276d32cce..9e4ae6f6a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -130,7 +130,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, - lastIndex(), lastTerm()), actor() + lastIndex(), lastTerm(), context.getPayloadVersion()), actor() ); return this; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index a6722e6ff9..150f300a46 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -104,7 +104,7 @@ public class Follower extends AbstractRaftActorBehavior { if (snapshotTracker != null) { // if snapshot install is in progress, follower should just acknowledge append entries with a reply. AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, - lastIndex(), lastTerm()); + lastIndex(), lastTerm(), context.getPayloadVersion()); if(LOG.isDebugEnabled()) { LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply); @@ -168,7 +168,7 @@ public class Follower extends AbstractRaftActorBehavior { logName(), lastIndex, lastTerm()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, - lastTerm()), actor()); + lastTerm(), context.getPayloadVersion()), actor()); return this; } @@ -250,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior { } AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, - lastIndex, lastTerm()); + lastIndex, lastTerm(), context.getPayloadVersion()); if(LOG.isTraceEnabled()) { LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index d2ea0c50cd..6c1d6b47c7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -53,8 +53,10 @@ public class AppendEntries extends AbstractRaftRPC { // index which has been replicated successfully to all followers, -1 if none private final long replicatedToAllIndex; - public AppendEntries(long term, String leaderId, long prevLogIndex, - long prevLogTerm, List entries, long leaderCommit, long replicatedToAllIndex) { + private final short payloadVersion; + + public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm, + List entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) { super(term); this.leaderId = leaderId; this.prevLogIndex = prevLogIndex; @@ -62,6 +64,7 @@ public class AppendEntries extends AbstractRaftRPC { this.entries = entries; this.leaderCommit = leaderCommit; this.replicatedToAllIndex = replicatedToAllIndex; + this.payloadVersion = payloadVersion; } private void writeObject(ObjectOutputStream out) throws IOException { @@ -75,7 +78,7 @@ public class AppendEntries extends AbstractRaftRPC { } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.readShort(); // version + in.readShort(); // raft version in.defaultReadObject(); @@ -110,14 +113,17 @@ public class AppendEntries extends AbstractRaftRPC { return replicatedToAllIndex; } + public short getPayloadVersion() { + return payloadVersion; + } @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("AppendEntries [term=").append(term).append(", leaderId=").append(leaderId) - .append(", prevLogIndex=").append(prevLogIndex).append(", prevLogTerm=").append(prevLogTerm) - .append(", entries=").append(entries).append(", leaderCommit=").append(leaderCommit) - .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append("]"); + builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex) + .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit) + .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=") + .append(payloadVersion).append("]"); return builder.toString(); } @@ -208,7 +214,7 @@ public class AppendEntries extends AbstractRaftRPC { from.getPrevLogIndex(), from.getPrevLogTerm(), logEntryList, - from.getLeaderCommit(), -1); + from.getLeaderCommit(), -1, (short)0); return to; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java index 01fef006a9..990605b288 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java @@ -29,13 +29,17 @@ public class AppendEntriesReply extends AbstractRaftRPC { // responding private final String followerId; - public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) { + private final short payloadVersion; + + public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm, + short payloadVersion) { super(term); this.followerId = followerId; this.success = success; this.logLastIndex = logLastIndex; this.logLastTerm = logLastTerm; + this.payloadVersion = payloadVersion; } @Override @@ -59,12 +63,16 @@ public class AppendEntriesReply extends AbstractRaftRPC { return followerId; } + public short getPayloadVersion() { + return payloadVersion; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("AppendEntriesReply [term=").append(term).append(", success=").append(success) - .append(", logLastIndex=").append(logLastIndex).append(", logLastTerm=").append(logLastTerm) - .append(", followerId=").append(followerId).append("]"); + builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex) + .append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId) + .append(", payloadVersion=").append(payloadVersion).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 586ca8cda0..a6853caf9e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -70,7 +70,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { - super(id, peerAddresses, config); + super(id, peerAddresses, config, (short) 0); state = new ArrayList<>(); this.actorDelegate = mock(RaftActor.class); this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 4aa3b2fb4e..01ff6ce14f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -41,6 +41,7 @@ public class MockRaftActorContext implements RaftActorContext { private boolean snapshotCaptureInitiated; private SnapshotManager snapshotManager; private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider(); + private short payloadVersion; public MockRaftActorContext(){ electionTerm = new ElectionTerm() { @@ -232,6 +233,15 @@ public class MockRaftActorContext implements RaftActorContext { this.persistenceProvider = persistenceProvider; } + @Override + public short getPayloadVersion() { + return payloadVersion; + } + + public void setPayloadVersion(short payloadVersion) { + this.payloadVersion = payloadVersion; + } + public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl { @Override public void appendAndPersist( ReplicatedLogEntry replicatedLogEntry) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index bdb1d6052c..cca9a2508e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -547,13 +547,13 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); //fake snapshot on index 5 - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0)); assertEquals(8, leaderActor.getReplicatedLog().size()); //fake snapshot on index 6 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0)); assertEquals(8, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); @@ -584,7 +584,7 @@ public class RaftActorTest extends AbstractActorTest { new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0)); assertEquals(2, leaderActor.getReplicatedLog().size()); assertEquals(8, leaderActor.getReplicatedLog().lastIndex()); @@ -650,7 +650,7 @@ public class RaftActorTest extends AbstractActorTest { (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("foo-6")) ); - followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0)); assertEquals(7, followerActor.getReplicatedLog().size()); //fake snapshot on index 7 @@ -661,7 +661,7 @@ public class RaftActorTest extends AbstractActorTest { (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("foo-7")) ); - followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short)0)); assertEquals(8, followerActor.getReplicatedLog().size()); assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); @@ -689,7 +689,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-7")) ); // send an additional entry 8 with leaderCommit = 7 - followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short)0)); // 7 and 8, as lastapplied is 7 assertEquals(2, followerActor.getReplicatedLog().size()); @@ -747,12 +747,12 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short)0)); assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // set the 2nd follower nextIndex to 1 which has been snapshotted - leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0)); assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); @@ -765,7 +765,7 @@ public class RaftActorTest extends AbstractActorTest { //reply from a slow follower does not initiate a fake snapshot - leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0)); assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size()); ByteString snapshotBytes = fromObject(Arrays.asList( @@ -780,7 +780,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); //reply from a slow follower after should not raise errors - leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short)0)); assertEquals(0, leaderActor.getReplicatedLog().size()); } }; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index f56755b447..d3f5a0eead 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -81,28 +81,31 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { */ @Test public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception { - MockRaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); + short payloadVersion = 5; + context.setPayloadVersion(payloadVersion); - // First set the receivers term to a high number (1000) - context.getTermInformation().update(1000, "test"); + // First set the receivers term to a high number (1000) + context.getTermInformation().update(1000, "test"); - AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1); + AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1, (short)4); - behavior = createBehavior(context); + behavior = createBehavior(context); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown"); + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown"); - RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries); + RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries); - assertEquals("Raft state", expected.state(), raftBehavior.state()); + assertEquals("Raft state", expected.state(), raftBehavior.state()); - // Also expect an AppendEntriesReply to be sent where success is false + // Also expect an AppendEntriesReply to be sent where success is false - AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching( - behaviorActor, AppendEntriesReply.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching( + behaviorActor, AppendEntriesReply.class); - assertEquals("isSuccess", false, reply.isSuccess()); + assertEquals("isSuccess", false, reply.isSuccess()); + assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); } @@ -119,7 +122,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { List entries = new ArrayList<>(); entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload)); - AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0); behavior = createBehavior(context); @@ -314,11 +317,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { } protected AppendEntries createAppendEntriesWithNewerTerm() { - return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1); + return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1, (short)0); } protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() { - return new AppendEntriesReply("follower-1", 100, false, 100, 100); + return new AppendEntriesReply("follower-1", 100, false, 100, 100, (short)0); } protected RequestVote createRequestVoteWithNewerTerm() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 63fd530675..2dc2b21c16 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -127,7 +127,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { setupPeers(1); candidate.handleMessage(peerActors[0], new AppendEntries(1, "test", 0, 0, - Collections.emptyList(), 0, -1)); + Collections.emptyList(), 0, -1, (short)0)); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching( peerActors[0], AppendEntriesReply.class); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index c9cec15837..443ebc31a4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -48,6 +48,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { private RaftActorBehavior follower; + private final short payloadVersion = 5; + @Override @After public void tearDown() throws Exception { @@ -70,7 +72,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef){ - return new MockRaftActorContext("follower", getSystem(), actorRef); + MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); + context.setPayloadVersion(payloadVersion ); + return context; } @Test @@ -139,7 +143,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -159,7 +163,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -180,7 +184,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101); + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -209,7 +213,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -229,7 +233,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // leader-2 is becoming the leader now and it says the commitIndex is 45 - appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100); + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -250,7 +254,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -271,7 +275,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101); + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -289,7 +293,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // leader-2 is becoming the leader now and it says the commitIndex is 45 - appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100); + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -323,7 +327,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -349,7 +353,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // AppendEntries is now sent with a bigger term // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1); + AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0); follower = createBehavior(context); @@ -397,7 +401,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // before the new behavior was created (1 in this case) // This will not work for a Candidate because as soon as a Candidate // is created it increments the term - AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1); + AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1, (short)0); follower = createBehavior(context); @@ -444,7 +448,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // before the new behavior was created (1 in this case) // This will not work for a Candidate because as soon as a Candidate // is created it increments the term - AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1); + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); follower = createBehavior(context); @@ -487,7 +491,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { List entries = new ArrayList<>(); entries.add(newReplicatedLogEntry(1, 4, "four")); - AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1); + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0); follower = createBehavior(context); @@ -518,7 +522,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1)); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0)); assertEquals("Next index", 2, log.last().getIndex() + 1); assertEquals("Entry 1", entries.get(0), log.get(1)); @@ -530,7 +534,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two")); leaderActor.underlyingActor().clear(); - follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1)); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0)); assertEquals("Next index", 3, log.last().getIndex() + 1); assertEquals("Entry 1", entries.get(0), log.get(1)); @@ -558,7 +562,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { List entries = new ArrayList<>(); entries.add(newReplicatedLogEntry(1, 4, "four")); - AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3); + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0); follower = createBehavior(context); @@ -725,7 +729,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101); + AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -798,6 +802,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getFollowerId", expFollowerId, reply.getFollowerId()); assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm()); assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); + assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); } private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java index e16d765cde..0eb58d427a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java @@ -78,7 +78,7 @@ public class IsolatedLeaderTest extends AbstractLeaderTest { // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor, new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true, - isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1)); + isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0)); assertEquals("Raft state", RaftState.Leader, behavior.state()); @@ -87,7 +87,7 @@ public class IsolatedLeaderTest extends AbstractLeaderTest { behavior = isolatedLeader.handleMessage(senderActor, new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true, - isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 )); + isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 )); assertEquals("Raft state", RaftState.Leader, behavior.state()); } @@ -113,13 +113,13 @@ public class IsolatedLeaderTest extends AbstractLeaderTest { // in a 5 member cluster, atleast 2 followers need to be active and return a reply RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor, new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true, - isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 )); + isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 )); assertEquals("Raft state", RaftState.IsolatedLeader, behavior.state()); behavior = isolatedLeader.handleMessage(senderActor, new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true, - isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 )); + isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 )); assertEquals("Raft state", RaftState.Leader, behavior.state()); @@ -128,7 +128,7 @@ public class IsolatedLeaderTest extends AbstractLeaderTest { behavior = isolatedLeader.handleMessage(senderActor, new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true, - isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 )); + isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 )); assertEquals("Raft state", RaftState.Leader, behavior.state()); } @@ -152,7 +152,7 @@ public class IsolatedLeaderTest extends AbstractLeaderTest { // bowing itself to another leader in the cluster RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor, new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true, - isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1)); + isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1, (short)0)); assertEquals("Raft state", RaftState.Follower, behavior.state()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 0255020328..9094a65654 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -85,6 +85,8 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); + short payloadVersion = (short)5; + actorContext.setPayloadVersion(payloadVersion); long term = 1; actorContext.getTermInformation().update(term, ""); @@ -98,10 +100,11 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm()); assertEquals("Entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); // The follower would normally reply - simulate that explicitly here. leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex - 1, term)); + FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -118,6 +121,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entries size", 1, appendEntries.getEntries().size()); assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); } @@ -146,7 +150,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -192,7 +196,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -232,7 +236,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -240,7 +244,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=0;i<3;i++) { sendReplicate(actorContext, lastIndex+i+1); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex + i + 1, term)); + FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); } @@ -282,7 +286,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -327,7 +331,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -364,7 +368,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -1105,12 +1109,12 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); // Send initial heartbeat reply with last index. - leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1)); + leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0)); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals("getNextIndex", 11, followerInfo.getNextIndex()); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1); + AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -1134,7 +1138,8 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1); + short payloadVersion = 5; + AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -1157,6 +1162,9 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState applyState = applyStateList.get(0); assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals(payloadVersion, followerInfo.getPayloadVersion()); } @Test @@ -1167,7 +1175,7 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); - AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1); + AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -1373,7 +1381,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=1;i<6;i++) { // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) - RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1)); + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0)); assertTrue(newBehavior == leader); Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java new file mode 100644 index 0000000000..0f1bee5f31 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.messages; + +import static org.junit.Assert.assertEquals; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; + +/** + * Unit tests for AppendEntriesReply. + * + * @author Thomas Pantelis + */ +public class AppendEntriesReplyTest { + + @Test + public void testSerialization() { + AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6); + AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected); + + assertEquals("getTerm", expected.getTerm(), cloned.getTerm()); + assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId()); + assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm()); + assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex()); + assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion()); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java index 5f5d73dbe6..286cd3c9c4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.raft.messages; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -19,9 +21,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - /** * Unit tests for AppendEntries. * @@ -35,7 +34,9 @@ public class AppendEntriesTest { ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2")); - AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1); + short payloadVersion = 5; + AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, + -1, payloadVersion); AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected); @@ -45,7 +46,7 @@ public class AppendEntriesTest { @Test public void testToAndFromSerializable() { AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, - Collections.emptyList(), 10L, -1); + Collections.emptyList(), 10L, -1, (short)0); assertSame("toSerializable", entries, entries.toSerializable()); assertSame("fromSerializable", entries, @@ -55,7 +56,7 @@ public class AppendEntriesTest { @Test public void testToAndFromLegacySerializable() { ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload")); - AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1); + AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1, (short)0); Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION); Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries); @@ -73,6 +74,7 @@ public class AppendEntriesTest { assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex()); assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm()); assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex()); + assertEquals("getPayloadVersion", expected.getPayloadVersion(), actual.getPayloadVersion()); assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size()); Iterator iter = expected.getEntries().iterator(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 148fa1881b..14a20da247 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -116,7 +116,8 @@ public class Shard extends RaftActor { protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); + super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()), + DataStoreVersions.CURRENT_VERSION); this.name = name.toString(); this.datastoreContext = datastoreContext; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java index ce7d6303ad..98f71cad4a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java @@ -70,6 +70,6 @@ public class CompositeModificationByteStringPayloadTest { entries.add(new ReplicatedLogImplEntry(0, 1, payload)); - assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable()); + assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1, (short)0).toSerializable()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java index 90b978821f..dc2f8c4f4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java @@ -55,7 +55,7 @@ public class CompositeModificationPayloadTest { }); AppendEntries appendEntries = - new AppendEntries(1, "member-1", 0, 100, entries, 1, -1); + new AppendEntries(1, "member-1", 0, 100, entries, 1, -1, (short)0); AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable(RaftVersions.HELIUM_VERSION); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java index 28fc6b0f57..290e23352c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java @@ -98,7 +98,7 @@ public class Client { } }); - return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1); + return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0); } public static AppendEntries keyValueAppendEntries() { @@ -123,6 +123,6 @@ public class Client { } }); - return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1); + return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0); } } diff --git a/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java b/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java index 0b72a32f10..9915911f78 100644 --- a/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java +++ b/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java @@ -13,6 +13,7 @@ import akka.actor.UntypedActor; import akka.japi.Creator; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -101,10 +102,12 @@ public class DummyShard extends UntypedActor{ if (!ignore) { LOG.info("{} - Randomizing delay : {}", followerId, delay); Thread.sleep(delay); - sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self()); + sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(), + DataStoreVersions.CURRENT_VERSION), self()); } } else { - sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self()); + sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(), + DataStoreVersions.CURRENT_VERSION), self()); } }