X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=eb70c8b028e16258c392e79a3394d040f9c7c291;hb=55e018bfad0c70b773641142d6fbf009cd67fda4;hp=5062f8f6e0c6d5875e949dcc38afeda5297f5827;hpb=2d62916cb1f4b4045f4fc38fbd313f8339f9ac67;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 5062f8f6e0..eb70c8b028 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -43,7 +43,6 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; -import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; @@ -52,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -60,10 +60,14 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; public class RaftActorTest extends AbstractActorTest { + static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class); + private TestActorFactory factory; @Before @@ -91,6 +95,8 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception { + TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting"); + new JavaTestKit(getSystem()) {{ String persistenceId = factory.generateActorId("follower-"); @@ -101,9 +107,9 @@ public class RaftActorTest extends AbstractActorTest { // log entry. config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + ImmutableMap peerAddresses = ImmutableMap.builder().put("member1", "address").build(); ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId, - ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config)), persistenceId); + peerAddresses, Optional.of(config)), persistenceId); watch(followerActor); @@ -156,8 +162,7 @@ public class RaftActorTest extends AbstractActorTest { //reinstate the actor TestActorRef ref = factory.createTestActor( - MockRaftActor.props(persistenceId, Collections.emptyMap(), - Optional.of(config))); + MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config))); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -176,6 +181,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState()); }}; + + TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending"); } @Test @@ -244,6 +251,10 @@ public class RaftActorTest extends AbstractActorTest { UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2"); mockRaftActor.handleRecover(updateElectionTerm); + org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm = + new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3"); + mockRaftActor.handleRecover(deprecatedUpdateElectionTerm); + verify(mockSupport).handleRecoveryMessage(same(snapshotOffer)); verify(mockSupport).handleRecoveryMessage(same(logEntry)); verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries)); @@ -251,6 +262,7 @@ public class RaftActorTest extends AbstractActorTest { verify(mockSupport).handleRecoveryMessage(same(deleteEntries)); verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries)); verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm)); + verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm)); } @Test @@ -275,7 +287,7 @@ public class RaftActorTest extends AbstractActorTest { doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot)); mockRaftActor.handleCommand(applySnapshot); - CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1); + CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null); doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot)); mockRaftActor.handleCommand(captureSnapshot); @@ -406,15 +418,18 @@ public class RaftActorTest extends AbstractActorTest { notifierActor, LeaderStateChanged.class); assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion()); notifierActor.underlyingActor().clear(); MockRaftActor raftActor = raftActorRef.underlyingActor(); final String newLeaderId = "new-leader"; + final short newLeaderVersion = 6; Follower follower = new Follower(raftActor.getRaftActorContext()) { @Override public RaftActorBehavior handleMessage(ActorRef sender, Object message) { leaderId = newLeaderId; + setLeaderPayloadVersion(newLeaderVersion); return this; } }; @@ -436,6 +451,15 @@ public class RaftActorTest extends AbstractActorTest { leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); assertEquals(persistenceId, leaderStateChange.getMemberId()); assertEquals(newLeaderId, leaderStateChange.getLeaderId()); + assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS); + leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class); + assertNull(leaderStateChange); }}; } @@ -465,6 +489,7 @@ public class RaftActorTest extends AbstractActorTest { Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); } + assertNotNull(matches); assertEquals(2, matches.size()); // check if the notifier got a role change from null to Follower @@ -534,13 +559,13 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); //fake snapshot on index 5 - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0)); assertEquals(8, leaderActor.getReplicatedLog().size()); //fake snapshot on index 6 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0)); assertEquals(8, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); @@ -554,13 +579,13 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider() - , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); + leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(), + leader, Runtime.getRuntime().totalMemory()); assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); + leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -571,7 +596,7 @@ public class RaftActorTest extends AbstractActorTest { new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0)); assertEquals(2, leaderActor.getReplicatedLog().size()); assertEquals(8, leaderActor.getReplicatedLog().lastIndex()); @@ -637,7 +662,7 @@ public class RaftActorTest extends AbstractActorTest { (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("foo-6")) ); - followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0)); assertEquals(7, followerActor.getReplicatedLog().size()); //fake snapshot on index 7 @@ -648,7 +673,7 @@ public class RaftActorTest extends AbstractActorTest { (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("foo-7")) ); - followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short)0)); assertEquals(8, followerActor.getReplicatedLog().size()); assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); @@ -664,7 +689,7 @@ public class RaftActorTest extends AbstractActorTest { assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); + followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -676,7 +701,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-7")) ); // send an additional entry 8 with leaderCommit = 7 - followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short)0)); // 7 and 8, as lastapplied is 7 assertEquals(2, followerActor.getReplicatedLog().size()); @@ -734,12 +759,12 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short)0)); assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // set the 2nd follower nextIndex to 1 which has been snapshotted - leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0)); assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); @@ -752,7 +777,7 @@ public class RaftActorTest extends AbstractActorTest { //reply from a slow follower does not initiate a fake snapshot - leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0)); assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size()); ByteString snapshotBytes = fromObject(Arrays.asList( @@ -767,7 +792,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); //reply from a slow follower after should not raise errors - leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short)0)); assertEquals(0, leaderActor.getReplicatedLog().size()); } }; @@ -784,7 +809,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); - Map peerAddresses = new HashMap<>(); + Map peerAddresses = ImmutableMap.builder().put("member1", "address").build(); TestActorRef mockActorRef = factory.createTestActor( MockRaftActor.props(persistenceId, peerAddresses, @@ -831,7 +856,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); - Map peerAddresses = new HashMap<>(); + Map peerAddresses = ImmutableMap.builder().put("member1", "address").build(); TestActorRef mockActorRef = factory.createTestActor( MockRaftActor.props(persistenceId, peerAddresses, @@ -863,7 +888,7 @@ public class RaftActorTest extends AbstractActorTest { }}; } - private ByteString fromObject(Object snapshot) throws Exception { + public static ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; try {