X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=0a4a2c7717facfcc9fc883c2234d4577ff49f876;hb=95115ca49f3b16b936e0f6c88aedfc17cd0ee92c;hp=b192b7cd242918a6735a85ad5a22e913493578ac;hpb=753515e8868a1a15982d3f2697439f522f273db5;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index b192b7cd24..0a4a2c7717 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -31,6 +31,7 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -54,16 +55,17 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -157,6 +159,16 @@ public class RaftActorTest extends AbstractActorTest { } } + + public void waitUntilLeader(){ + for(int i = 0;i < 10; i++){ + if(isLeader()){ + break; + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + public List getState() { return state; } @@ -176,6 +188,13 @@ public class RaftActorTest extends AbstractActorTest { return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier, + DataPersistenceProvider dataPersistenceProvider){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); + } + + @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called", persistenceId()); @@ -673,11 +692,13 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.waitUntilLeader(); + mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); - verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class)); } }; } @@ -701,9 +722,11 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.waitUntilLeader(); + mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); - verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); } @@ -736,10 +759,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); - RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, -1, + new MockRaftActorContext.MockPayload("D")), -1); + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -763,17 +788,18 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + ImmutableMap.of("leader", "fake/path"), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(lastEntry); ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), @@ -785,7 +811,8 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); long replicatedToAllIndex = 1; - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); + + mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); verify(mockRaftActor.delegate).createSnapshot(); @@ -927,7 +954,9 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("D")), 1); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -942,7 +971,83 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testRaftRoleChangeNotifier() throws Exception { + public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception { + new JavaTestKit(getSystem()) {{ + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); + MessageCollectorActor.waitUntilReady(notifierActor); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + long heartBeatInterval = 100; + config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(20); + + String persistenceId = factory.generateActorId("notifier-"); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), notifierActor, + new NonPersistentProvider()), persistenceId); + + List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); + + + // check if the notifier got a role change from null to Follower + RoleChanged raftRoleChanged = matches.get(0); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertNull(raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Follower to Candidate + raftRoleChanged = matches.get(1); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Candidate to Leader + raftRoleChanged = matches.get(2); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + + assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + + notifierActor.underlyingActor().clear(); + + MockRaftActor raftActor = raftActorRef.underlyingActor(); + final String newLeaderId = "new-leader"; + Follower follower = new Follower(raftActor.getRaftActorContext()) { + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + leaderId = newLeaderId; + return this; + } + }; + + raftActor.changeCurrentBehavior(follower); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(null, leaderStateChange.getLeaderId()); + + raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(newLeaderId, leaderStateChange.getLeaderId()); + }}; + } + + @Test + public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception { new JavaTestKit(getSystem()) {{ ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); MessageCollectorActor.waitUntilReady(notifierActor); @@ -954,8 +1059,8 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); + factory.createActor(MockRaftActor.props(persistenceId, + ImmutableMap.of("leader", "fake/path"), Optional.of(config), notifierActor), persistenceId); List matches = null; for(int i = 0; i < 5000 / heartBeatInterval; i++) { @@ -967,7 +1072,7 @@ public class RaftActorTest extends AbstractActorTest { Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); } - assertEquals(3, matches.size()); + assertEquals(2, matches.size()); // check if the notifier got a role change from null to Follower RoleChanged raftRoleChanged = matches.get(0); @@ -981,11 +1086,6 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); - // check if the notifier got a role change from Candidate to Leader - raftRoleChanged = matches.get(2); - assertEquals(persistenceId, raftRoleChanged.getMemberId()); - assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); - assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); }}; } @@ -1031,9 +1131,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + leaderActor.getRaftActorContext().getSnapshotManager() + .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("x")), 4); - leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(leaderActor.delegate).createSnapshot(); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1059,8 +1160,14 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider() + , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); + + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -1123,9 +1230,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + followerActor.getRaftActorContext().getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("D")), 4); - followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(followerActor.delegate).createSnapshot(); assertEquals(6, followerActor.getReplicatedLog().size()); @@ -1160,7 +1268,10 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -1258,7 +1369,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); @@ -1342,7 +1453,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(-1, leader.getReplicatedToAllIndex()); }}; @@ -1385,7 +1496,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(3, leader.getReplicatedToAllIndex()); }};