X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorServerConfigurationSupportTest.java;h=5c9ba13b8beda3e439df5dce505f2d96c747e1be;hb=3fda1a923defdbf18849c6080c3aa19f1ebf2c5f;hp=538681e8df1eff6ba0c061fbd31f1bcae0390b61;hpb=4e186d6e4a9c84482dc74aee353e12a12f6728a7;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 538681e8df..5c9ba13b8b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -7,7 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; -//import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertEquals; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; import akka.actor.ActorRef; @@ -19,7 +19,9 @@ import akka.testkit.TestActorRef; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.util.Collections; +import java.util.List; import java.util.Map; //import java.util.List; import java.util.concurrent.TimeUnit; @@ -28,20 +30,26 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.messages.AddServer; -//import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; -//import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; -//import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; + /** * Unit tests for RaftActorServerConfigurationSupport. * @@ -60,23 +68,29 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(FOLLOWER_ID)); - private final TestActorRef newServerActor = actorFactory.createTestActor( - Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()), - actorFactory.generateActorId(NEW_SERVER_ID)); + private TestActorRef newFollowerRaftActor; + private TestActorRef newFollowerCollectorActor; - private RaftActorContext newServerActorContext; + private RaftActorContext newFollowerActorContext; private final JavaTestKit testKit = new JavaTestKit(getSystem()); @Before public void setup() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); - newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(), - NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1, - Maps.newHashMap(), configParams, NO_PERSISTENCE, LOG); - newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + newFollowerCollectorActor = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(NEW_SERVER_ID + "Collector")); + newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props( + configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(NEW_SERVER_ID)); + newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext(); } @After @@ -85,19 +99,16 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { } @Test - public void testAddServerWithFollower() throws Exception { + public void testAddServerWithExistingFollower() throws Exception { RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor); followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries( 0, 3, 1).build()); - followerActorContext.setCommitIndex(3); - followerActorContext.setLastApplied(3); + followerActorContext.setCommitIndex(2); + followerActorContext.setLastApplied(2); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); - Follower newServer = new Follower(newServerActorContext); - newServerActor.underlyingActor().setBehavior(newServer); - TestActorRef leaderActor = actorFactory.createTestActor( MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -109,33 +120,173 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); - leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + + // Leader should install snapshot - capture and verify ApplySnapshot contents + + ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class); + List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); + assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + + // Verify ServerConfigurationPayload entry in leader's log + + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + + // Verify ServerConfigurationPayload entry in both followers + + assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex()); + verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + + assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex()); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + + // Verify new server config was applied in both followers + + assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), + followerActorContext.getPeerAddresses().keySet()); + + assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), + newFollowerActorContext.getPeerAddresses().keySet()); + + clearMessages(followerActor); + clearMessages(newFollowerCollectorActor); + + expectFirstMatching(newFollowerCollectorActor, ApplyState.class); + expectFirstMatching(followerActor, ApplyState.class); + + assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex()); + assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied()); + assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex()); + assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied()); + } + + @Test + public void testAddServerWithNoExistingFollower() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(1); + initialActorContext.setLastApplied(1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries( + 0, 2, 1).build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + + // Leader should install snapshot - capture and verify ApplySnapshot contents + + ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class); + List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); + assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + + // Verify ServerConfigurationPayload entry in leader's log + + assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + + // Verify ServerConfigurationPayload entry in the new follower + + clearMessages(newFollowerCollectorActor); + + expectFirstMatching(newFollowerCollectorActor, ApplyState.class); + assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex()); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + + // Verify new server config was applied in the new follower + + assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), + newFollowerActorContext.getPeerAddresses().keySet()); + } + + @Test + public void testAddServerAsNonVoting() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef()); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + + // Verify ServerConfigurationPayload entry in leader's log + + assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + + // Verify ServerConfigurationPayload entry in the new follower + + expectFirstMatching(newFollowerCollectorActor, ApplyState.class); + assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex()); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + + // Verify new server config was applied in the new follower + + assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), + newFollowerActorContext.getPeerAddresses().keySet()); + + MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500); + } + + @Test + public void testAddServerWithInstallSnapshotTimeout() throws Exception { + newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS); - // leader should install snapshot - capture and verify ApplySnapshot contents - //ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class); - //List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); - //assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); - // leader should replicate new server config to both followers - //expectFirstMatching(followerActor, AppendEntries.class); - //expectFirstMatching(newServerActor, AppendEntries.class); + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); - // verify ServerConfigurationPayload entry in leader's log + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); - //assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size()); - //assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex()); - ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get( - leaderActorContext.getReplicatedLog().lastIndex()); - // verify logEntry contents - // Also verify ServerConfigurationPayload entry in both followers + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); - //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); - //assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); - //assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus()); + + assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size()); + assertEquals("Leader followers size", 0, + ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size()); } - //@Test + @Test public void testAddServerWithNoLeader() { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -146,12 +297,12 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { actorFactory.generateActorId(LEADER_ID)); noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete(); - noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); - //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); - //assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); + noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); } - //@Test + @Test public void testAddServerForwardedToLeader() { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -169,16 +320,25 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.emptyList(), -1, -1, (short)0), leaderActor); - followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); - //expectFirstMatching(leaderActor, AddServer.class); + followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + expectFirstMatching(leaderActor, AddServer.class); + } + + private void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) { + ReplicatedLogEntry logEntry = log.get(log.lastIndex()); + assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass()); + ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData(); + assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig())); } private RaftActorContext newFollowerContext(String id, TestActorRef actor) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); + ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG); + termInfo.update(1, LEADER_ID); RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), - id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1, + id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG); return followerActorContext; @@ -198,6 +358,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { context.setCommitIndex(fromContext.getCommitIndex()); context.setLastApplied(fromContext.getLastApplied()); + context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(), + fromContext.getTermInformation().getVotedFor()); } @Override @@ -217,9 +379,37 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { static Props props(Map peerAddresses, RaftActorContext fromContext) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - configParams.setElectionTimeoutFactor(100000); + configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(1); return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext); } } + + public static class MockNewFollowerRaftActor extends MockRaftActor { + private final TestActorRef collectorActor; + private volatile Class dropMessageOfType; + + public MockNewFollowerRaftActor(ConfigParams config, TestActorRef collectorActor) { + super(NEW_SERVER_ID, Maps.newHashMap(), Optional.of(config), null); + this.collectorActor = collectorActor; + } + + void setDropMessageOfType(Class dropMessageOfType) { + this.dropMessageOfType = dropMessageOfType; + } + + @Override + public void handleCommand(Object message) { + if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) { + return; + } + + super.handleCommand(message); + collectorActor.tell(message, getSender()); + } + + static Props props(ConfigParams config, TestActorRef collectorActor) { + return Props.create(MockNewFollowerRaftActor.class, config, collectorActor); + } + } }