X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=83868b6a2ad187a257bc3323479b8d84974276a5;hp=56bf6200dc4d328aec1a35b6a35470609fa5cc7e;hb=ab43a3c309d46978f6fae340c9c2ef0b2755891c;hpb=c64ef5f44f131976c20fcf8ced56627f81091838 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 56bf6200dc..83868b6a2a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -60,7 +60,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; -import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -552,7 +552,6 @@ public class RaftActorTest extends AbstractActorTest { assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); - }}; } @@ -576,12 +575,12 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar"); assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); - } - }; } @@ -602,14 +601,14 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)); mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry); verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class)); - } - }; } @@ -630,14 +629,14 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); - } - }; } @@ -658,6 +657,8 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.onReceiveCommand(new ApplyLogEntries(10)); verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); @@ -685,13 +686,15 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); @@ -722,6 +725,8 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); @@ -737,7 +742,8 @@ public class RaftActorTest extends AbstractActorTest { RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1)); + long replicatedToAllIndex = 1; + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); verify(mockRaftActor.delegate).createSnapshot(); @@ -749,13 +755,16 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).deleteMessages(100); - assertEquals(2, mockRaftActor.getReplicatedLog().size()); + assertEquals(3, mockRaftActor.getReplicatedLog().size()); + assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex()); + assertNotNull(mockRaftActor.getReplicatedLog().get(2)); assertNotNull(mockRaftActor.getReplicatedLog().get(3)); assertNotNull(mockRaftActor.getReplicatedLog().get(4)); // Index 2 will not be in the log because it was removed due to snapshotting - assertNull(mockRaftActor.getReplicatedLog().get(2)); + assertNull(mockRaftActor.getReplicatedLog().get(1)); + assertNull(mockRaftActor.getReplicatedLog().get(0)); } }; @@ -779,6 +788,8 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); @@ -807,6 +818,8 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog(); oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); @@ -860,6 +873,8 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.waitForInitializeBehaviorComplete(); + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), @@ -870,7 +885,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -888,33 +903,44 @@ public class RaftActorTest extends AbstractActorTest { public void testRaftRoleChangeNotifier() throws Exception { new JavaTestKit(getSystem()) {{ ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); + MessageCollectorActor.waitUntilReady(notifierActor); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + long heartBeatInterval = 100; + config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(1); + String persistenceId = factory.generateActorId("notifier-"); factory.createTestActor(MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); - // sleeping for a minimum of 2 seconds, if it spans more its fine. - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + List matches = null; + for(int i = 0; i < 5000 / heartBeatInterval; i++) { + matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + assertNotNull(matches); + if(matches.size() == 3) { + break; + } + Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); + } - List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); - assertNotNull(matches); assertEquals(3, matches.size()); // check if the notifier got a role change from null to Follower - RoleChanged raftRoleChanged = (RoleChanged) matches.get(0); + RoleChanged raftRoleChanged = matches.get(0); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertNull(raftRoleChanged.getOldRole()); assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); // check if the notifier got a role change from Follower to Candidate - raftRoleChanged = (RoleChanged) matches.get(1); + raftRoleChanged = matches.get(1); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); // check if the notifier got a role change from Candidate to Leader - raftRoleChanged = (RoleChanged) matches.get(2); + raftRoleChanged = matches.get(2); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); @@ -940,11 +966,12 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = new HashMap<>(); peerAddresses.put(follower1Id, followerActor1.path().toString()); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), + TestActorRef mockActorRef = factory.createTestActor( MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(4); leaderActor.getRaftActorContext().setLastApplied(4); leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); @@ -962,7 +989,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1)); + leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(leaderActor.delegate).createSnapshot(); @@ -1029,7 +1057,7 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = new HashMap<>(); peerAddresses.put(leaderId, leaderActor1.path().toString()); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), + TestActorRef mockActorRef = factory.createTestActor( MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config), dataPersistenceProvider), persistenceId); @@ -1040,19 +1068,21 @@ public class RaftActorTest extends AbstractActorTest { followerActor.waitForInitializeBehaviorComplete(); - // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + Follower follower = new Follower(followerActor.getRaftActorContext()); followerActor.setCurrentBehavior(follower); assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); - // log as indices 0-5 + // log has indices 0-5 assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1)); + followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(followerActor.delegate).createSnapshot(); @@ -1090,7 +1120,7 @@ public class RaftActorTest extends AbstractActorTest { followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); - // capture snapshot reply should remove the snapshotted entries only + // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log assertEquals(7, followerActor.getReplicatedLog().lastIndex()); @@ -1150,19 +1180,25 @@ public class RaftActorTest extends AbstractActorTest { // create 5 entries in the log MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build()); + //set the snapshot index to 4 , 0 to 4 are snapshotted leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4); + //setting replicatedToAllIndex = 9, for the log to clear + leader.setReplicatedToAllIndex(9); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // set the 2nd follower nextIndex to 1 which has been snapshotted leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // simulate a real snapshot - leaderActor.onReceiveCommand(new InitiateInstallSnapshot()); + leaderActor.onReceiveCommand(new SendHeartBeat()); assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ", leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()) @@ -1182,7 +1218,7 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); - assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size()); + assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); //reply from a slow follower after should not raise errors leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));