From: Kamal Rameshan Date: Sat, 20 Sep 2014 00:43:32 +0000 (-0700) Subject: Bug-1978:Leader initializes commit index to last index X-Git-Tag: release/helium~27^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d76829d104b8024934f5ba0950806016782c9ffb Bug-1978:Leader initializes commit index to last index Change-Id: Ic42a14911e8ff4db40fde2a917c8cb2edf412c76 Signed-off-by: Kamal Rameshan Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 199d2d61cf..ff8a2256d3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -88,16 +88,12 @@ public class Leader extends AbstractRaftActorBehavior { LOG = context.getLogger(); - if (lastIndex() >= 0) { - context.setCommitIndex(lastIndex()); - } - followers = context.getPeerAddresses().keySet(); for (String followerId : followers) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, - new AtomicLong(lastIndex()), + new AtomicLong(context.getCommitIndex()), new AtomicLong(-1)); followerToLog.put(followerId, followerLogInformation); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index ca34a34ca4..0d5f644b3d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -200,6 +200,10 @@ public class MockRaftActorContext implements RaftActorContext { public static class MockPayload extends Payload implements Serializable { private String value = ""; + public MockPayload(){ + + } + public MockPayload(String s) { this.value = s; } @@ -251,4 +255,24 @@ public class MockRaftActorContext implements RaftActorContext { return index; } } + + public static class MockReplicatedLogBuilder { + private ReplicatedLog mockLog = new SimpleReplicatedLog(); + + public MockReplicatedLogBuilder createEntries(int start, int end, int term) { + for (int i=start; i future = ask(actor, message, operationTimeout); - - try { - return Await.result(future, operationDuration); - } catch (Exception e) { - throw e; - } + return MessageCollectorActor.getAllMessages(actor); } public ByteString getNextChunk (ByteString bs, int offset){ diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index c4ef51d968..19af64790f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -20,9 +20,12 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; import java.io.ByteArrayOutputStream; @@ -171,18 +174,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { actorContext.getReplicatedLog().removeFrom(0); - actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(0, 1, - new MockRaftActorContext.MockPayload("foo"))); - - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(1, 1, - new MockRaftActorContext.MockPayload("foo")); - - actorContext.getReplicatedLog().append(entry); + actorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1) + .build()); Leader leader = new Leader(actorContext); RaftState raftState = leader - .handleMessage(senderActor, new Replicate(null, "state-id",entry)); + .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1))); // State should not change assertEquals(RaftState.Leader, raftState); @@ -335,7 +333,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { new ReplicatedLogImplEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); - RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot()); assertEquals(RaftState.Leader, raftState); @@ -526,6 +523,157 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { return null; } + public static class ForwardMessageToBehaviorActor extends MessageCollectorActor { + private static AbstractRaftActorBehavior behavior; + + public ForwardMessageToBehaviorActor(){ + + } + + @Override public void onReceive(Object message) throws Exception { + super.onReceive(message); + behavior.handleMessage(sender(), message); + } + + public static void setBehavior(AbstractRaftActorBehavior behavior){ + ForwardMessageToBehaviorActor.behavior = behavior; + } + } + + @Test + public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { + new JavaTestKit(getSystem()) {{ + + ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class)); + + MockRaftActorContext followerActorContext = + new MockRaftActorContext("follower", getSystem(), followerActor); + + Follower follower = new Follower(followerActorContext); + + ForwardMessageToBehaviorActor.setBehavior(follower); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + + leaderActorContext.getReplicatedLog().removeFrom(0); + + //create 3 entries + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + + leaderActorContext.setCommitIndex(1); + + followerActorContext.getReplicatedLog().removeFrom(0); + + // follower too has the exact same log entries and has the same commit index + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + + followerActorContext.setCommitIndex(1); + + Leader leader = new Leader(leaderActorContext); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + AppendEntriesMessages.AppendEntries appendEntries = + (AppendEntriesMessages.AppendEntries) MessageCollectorActor + .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class); + + assertNotNull(appendEntries); + + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(1, appendEntries.getLogEntries(0).getIndex()); + assertEquals(0, appendEntries.getPrevLogIndex()); + + AppendEntriesReply appendEntriesReply = + (AppendEntriesReply) MessageCollectorActor.getFirstMatching( + leaderActor, AppendEntriesReply.class); + + assertNotNull(appendEntriesReply); + + // follower returns its next index + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); + + }}; + } + + + @Test + public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { + new JavaTestKit(getSystem()) {{ + + ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + ActorRef followerActor = getSystem().actorOf( + Props.create(ForwardMessageToBehaviorActor.class)); + + MockRaftActorContext followerActorContext = + new MockRaftActorContext("follower", getSystem(), followerActor); + + Follower follower = new Follower(followerActorContext); + + ForwardMessageToBehaviorActor.setBehavior(follower); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + + leaderActorContext.getReplicatedLog().removeFrom(0); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + + leaderActorContext.setCommitIndex(1); + + followerActorContext.getReplicatedLog().removeFrom(0); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + + // follower has the same log entries but its commit index > leaders commit index + followerActorContext.setCommitIndex(2); + + Leader leader = new Leader(leaderActorContext); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + AppendEntriesMessages.AppendEntries appendEntries = + (AppendEntriesMessages.AppendEntries) MessageCollectorActor + .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class); + + assertNotNull(appendEntries); + + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(1, appendEntries.getLogEntries(0).getIndex()); + assertEquals(0, appendEntries.getPrevLogIndex()); + + AppendEntriesReply appendEntriesReply = + (AppendEntriesReply) MessageCollectorActor.getFirstMatching( + leaderActor, AppendEntriesReply.class); + + assertNotNull(appendEntriesReply); + + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); + + }}; + } + private static class LeaderTestKit extends JavaTestKit { private LeaderTestKit(ActorSystem actorSystem) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 88eecfe705..58928453b4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -8,10 +8,18 @@ package org.opendaylight.controller.cluster.raft.utils; +import akka.actor.ActorRef; import akka.actor.UntypedActor; +import akka.pattern.Patterns; +import akka.util.Timeout; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; public class MessageCollectorActor extends UntypedActor { @@ -26,4 +34,35 @@ public class MessageCollectorActor extends UntypedActor { messages.add(message); } } + + public static List getAllMessages(ActorRef actor) throws Exception { + FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); + Timeout operationTimeout = new Timeout(operationDuration); + Future future = Patterns.ask(actor, "get-all-messages", operationTimeout); + + try { + return (List) Await.result(future, operationDuration); + } catch (Exception e) { + throw e; + } + } + + /** + * Get the first message that matches the specified class + * @param actor + * @param clazz + * @return + */ + public static Object getFirstMatching(ActorRef actor, Class clazz) throws Exception { + List allMessages = getAllMessages(actor); + + for(Object message : allMessages){ + if(message.getClass().equals(clazz)){ + return message; + } + } + + return null; + } + }