X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=fd4a75a22f735c06d2ab8042eb13d6e92de572a0;hp=90acbb1eae08db0eb08e485e7dcd5e1860a80998;hb=5ba5e3de7c3c645ceac29c642fee928496a49fc4;hpb=7be62e955c32ff7fa10753c4307199b287b1904c diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 90acbb1eae..fd4a75a22f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -3,17 +3,44 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; +import akka.util.Timeout; +import com.google.protobuf.ByteString; import junit.framework.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; - +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static akka.pattern.Patterns.ask; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class FollowerTest extends AbstractRaftActorBehaviorTest { @@ -25,20 +52,24 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { return new Follower(actorContext); } - @Override protected RaftActorContext createActorContext() { - return new MockRaftActorContext("test", getSystem(), followerActor); + @Override protected RaftActorContext createActorContext() { + return createActorContext(followerActor); + } + + protected RaftActorContext createActorContext(ActorRef actorRef){ + return new MockRaftActorContext("test", getSystem(), actorRef); } @Test public void testThatAnElectionTimeoutIsTriggered(){ new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { + new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) { protected void run() { Follower follower = new Follower(createActorContext(getTestActor())); - final Boolean out = new ExpectMsg(duration("1 seconds"), "ElectionTimeout") { + final Boolean out = new ExpectMsg(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") { // do not put code outside this method, will run afterwards protected Boolean match(Object in) { if (in instanceof ElectionTimeout) { @@ -133,4 +164,416 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { }}; } + /** + * This test verifies that when an AppendEntries RPC is received by a RaftActor + * with a commitIndex that is greater than what has been applied to the + * state machine of the RaftActor, the RaftActor applies the state and + * sets it current applied state to the commitIndex of the sender. + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { + new JavaTestKit(getSystem()) {{ + + RaftActorContext context = + createActorContext(); + + context.setLastApplied(100); + setLastLogEntry((MockRaftActorContext) context, 1, 100, + new MockRaftActorContext.MockPayload("")); + ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99); + + List entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101, + new MockRaftActorContext.MockPayload("foo")) + ); + + // The new commitIndex is 101 + AppendEntries appendEntries = + new AppendEntries(2, "leader-1", 100, 1, entries, 101); + + RaftState raftState = + createBehavior(context).handleMessage(getRef(), appendEntries); + + assertEquals(101L, context.getLastApplied()); + + }}; + } + + /** + * This test verifies that when an AppendEntries is received a specific prevLogTerm + * which does not match the term that is in RaftActors log entry at prevLogIndex + * then the RaftActor does not change it's state and it returns a failure. + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() + throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(95, "test"); + + // Set the last log entry term for the receiver to be greater than + // what we will be sending as the prevLogTerm in AppendEntries + MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog = + setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload("")); + + // AppendEntries is now sent with a bigger term + // this will set the receivers term to be the same as the sender's term + AppendEntries appendEntries = + new AppendEntries(100, "leader-1", 0, 0, null, 101); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + + + }}; + } + + + + /** + * This test verifies that when a new AppendEntries message is received with + * new entries and the logs of the sender and receiver match that the new + * entries get added to the log and the log is incremented by the number of + * entries received in appendEntries + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesAddNewEntries() throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = + new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three"))); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four"))); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = + new AppendEntries(1, "leader-1", 2, 1, entries, 4); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + assertEquals(5, log.last().getIndex() + 1); + assertNotNull(log.get(3)); + assertNotNull(log.get(4)); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + + + }}; + } + + + + /** + * This test verifies that when a new AppendEntries message is received with + * new entries and the logs of the sender and receiver are out-of-sync that + * the log is first corrected by removing the out of sync entries from the + * log and then adding in the new entries sent with the AppendEntries message + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesCorrectReceiverLogEntries() + throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(2, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = + new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1"))); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three"))); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = + new AppendEntries(2, "leader-1", 1, 1, entries, 3); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + + // The entry at index 2 will be found out-of-sync with the leader + // and will be removed + // Then the two new entries will be added to the log + // Thus making the log to have 4 entries + assertEquals(4, log.last().getIndex() + 1); + assertNotNull(log.get(2)); + + assertEquals("one", log.get(1).getData().toString()); + + // Check that the entry at index 2 has the new data + assertEquals("two-1", log.get(2).getData().toString()); + + assertEquals("three", log.get(3).getData().toString()); + + assertNotNull(log.get(3)); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + + + }}; + } + + + /** + * This test verifies that when InstallSnapshot is received by + * the follower its applied correctly. + * + * @throws Exception + */ + @Test + public void testHandleInstallSnapshot() throws Exception { + JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{ + + ActorRef leaderActor = getSystem().actorOf(Props.create( + MessageCollectorActor.class)); + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(getRef()); + + Follower follower = (Follower)createBehavior(context); + + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString chunkData = ByteString.EMPTY; + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int i = 1; + + do { + chunkData = getNextChunk(bsSnapshot, offset); + final InstallSnapshot installSnapshot = + new InstallSnapshot(1, "leader-1", i, 1, + chunkData, i, 3); + follower.handleMessage(leaderActor, installSnapshot); + offset = offset + 50; + i++; + } while ((offset+50) < snapshotLength); + + final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3); + follower.handleMessage(leaderActor, installSnapshot3); + + String[] matches = new ReceiveWhile(String.class, duration("2 seconds")) { + @Override + protected String match(Object o) throws Exception { + if (o instanceof ApplySnapshot) { + ApplySnapshot as = (ApplySnapshot)o; + if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) { + return "applySnapshot-lastIndex-mismatch"; + } + if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) { + return "applySnapshot-lastAppliedTerm-mismatch"; + } + if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) { + return "applySnapshot-lastAppliedIndex-mismatch"; + } + if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) { + return "applySnapshot-lastTerm-mismatch"; + } + return "applySnapshot"; + } + + return "ignoreCase"; + } + }.get(); + + String applySnapshotMatch = ""; + for (String reply: matches) { + if (reply.startsWith("applySnapshot")) { + applySnapshotMatch = reply; + } + } + + assertEquals("applySnapshot", applySnapshotMatch); + + Object messages = executeLocalOperation(leaderActor, "get-all-messages"); + + assertNotNull(messages); + assertTrue(messages instanceof List); + List listMessages = (List) messages; + + int installSnapshotReplyReceivedCount = 0; + for (Object message: listMessages) { + if (message instanceof InstallSnapshotReply) { + ++installSnapshotReplyReceivedCount; + } + } + + assertEquals(3, installSnapshotReplyReceivedCount); + + }}; + } + + public Object executeLocalOperation(ActorRef actor, Object message) throws Exception { + FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); + Timeout operationTimeout = new Timeout(operationDuration); + Future future = ask(actor, message, operationTimeout); + + try { + return Await.result(future, operationDuration); + } catch (Exception e) { + throw e; + } + } + + public ByteString getNextChunk (ByteString bs, int offset){ + int snapshotLength = bs.size(); + int start = offset; + int size = 50; + if (50 > snapshotLength) { + size = snapshotLength; + } else { + if ((start + 50) > snapshotLength) { + size = snapshotLength - start; + } + } + return bs.substring(start, start + size); + } + + private ByteString toByteString(Map state) { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(state); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } catch (IOException e) { + org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e); + } + return null; + } }