X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=6565c59b5f6ad4c9639d4d001f57c76fa2872f91;hp=e19135cbcb0710f676470da9af0ded40565a48ca;hb=23b10ec4ddfdd9348c2abe7dbcfbed3b49db3dc6;hpb=f9c49de804eb7741df63d0a15889d485d65660e7 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index e19135cbcb..6565c59b5f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -65,12 +65,12 @@ import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; @@ -85,6 +85,7 @@ import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolic import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -307,9 +308,6 @@ public class RaftActorTest extends AbstractActorTest { ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2); mockRaftActor.handleRecover(applyJournalEntries); - ApplyLogEntries applyLogEntries = new ApplyLogEntries(0); - mockRaftActor.handleRecover(applyLogEntries); - DeleteEntries deleteEntries = new DeleteEntries(1); mockRaftActor.handleRecover(deleteEntries); @@ -327,7 +325,6 @@ public class RaftActorTest extends AbstractActorTest { verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class)); - verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class)); verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class)); @@ -364,11 +361,11 @@ public class RaftActorTest extends AbstractActorTest { doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class)); mockRaftActor.handleCommand(captureSnapshotReply); - SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class)); + SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L)); doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class)); mockRaftActor.handleCommand(saveSnapshotSuccess); - SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable()); + SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable()); doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class)); mockRaftActor.handleCommand(saveSnapshotFailure); @@ -442,9 +439,10 @@ public class RaftActorTest extends AbstractActorTest { ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); - mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry)); + final Identifier id = new MockIdentifier("apply-state"); + mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry)); - verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); + verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject()); } }; @@ -504,7 +502,7 @@ public class RaftActorTest extends AbstractActorTest { Follower follower = new Follower(raftActor.getRaftActorContext()) { @Override public RaftActorBehavior handleMessage(ActorRef sender, Object message) { - leaderId = newLeaderId; + setLeaderId(newLeaderId); setLeaderPayloadVersion(newLeaderVersion); return this; } @@ -656,12 +654,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(), - leader, Runtime.getRuntime().totalMemory()); + Runtime.getRuntime().totalMemory()); assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader); + leaderActor.getRaftActorContext().getSnapshotManager().commit(-1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -764,7 +762,7 @@ public class RaftActorTest extends AbstractActorTest { assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower); + followerActor.getRaftActorContext().getSnapshotManager().commit(-1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -843,7 +841,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // simulate a real snapshot - leaderActor.onReceiveCommand(new SendHeartBeat()); + leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE); assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ", leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()) @@ -948,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // Persist another entry (this will cause a CaptureSnapshot to be triggered - leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + leaderActor.persistData(mockActorRef, new MockIdentifier("x"), + new MockRaftActorContext.MockPayload("duh")); // Now send a CaptureSnapshotReply mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); @@ -961,40 +960,6 @@ public class RaftActorTest extends AbstractActorTest { }}; } - @Test - public void testRaftActorOnRecoverySnapshot() throws Exception { - TEST_LOG.info("testRaftActorOnRecoverySnapshot"); - - new JavaTestKit(getSystem()) {{ - String persistenceId = factory.generateActorId("follower-"); - - DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); - - // Set the heartbeat interval high to essentially disable election otherwise the test - // may fail if the actor is switched to Leader - config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - - ImmutableMap peerAddresses = ImmutableMap.builder().put("member1", "address").build(); - - // Create mock ReplicatedLogEntry - ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1, - new MockRaftActorContext.MockPayload("F", 1)); - - InMemoryJournal.addEntry(persistenceId, 1, replLogEntry); - - TestActorRef ref = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, config)); - - MockRaftActor mockRaftActor = ref.underlyingActor(); - - mockRaftActor.waitForRecoveryComplete(); - - mockRaftActor.waitForInitializeBehaviorComplete(); - - verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class)); - }}; - } - @Test public void testSwitchBehavior(){ String persistenceId = factory.generateActorId("leader-"); @@ -1132,9 +1097,6 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForRecoveryComplete(); - // Wait for snapshot after recovery - verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class)); - mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef()); @@ -1294,7 +1256,8 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForRecoveryComplete(); - verify(mockRaftActor.snapshotCohortDelegate, timeout(500).never()).applySnapshot(any(byte[].class)); + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class)); RaftActorContext context = mockRaftActor.getRaftActorContext(); assertEquals("Journal log size", 1, context.getReplicatedLog().size()); @@ -1332,4 +1295,39 @@ public class RaftActorTest extends AbstractActorTest { TEST_LOG.info("testNonVotingOnRecovery ending"); } + + @Test + public void testLeaderTransitioning() throws Exception { + TEST_LOG.info("testLeaderTransitioning starting"); + + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + String persistenceId = factory.generateActorId("test-actor-"); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForInitializeBehaviorComplete(); + + raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.emptyList(), + 0L, -1L, (short)1), ActorRef.noSender()); + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId()); + + MessageCollectorActor.clearMessages(notifierActor); + + raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender()); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId()); + assertEquals("getLeaderId", null, leaderStateChange.getLeaderId()); + + TEST_LOG.info("testLeaderTransitioning ending"); + } }