X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=f71cb984b3e414bf879669cc64535e2d3d64c78c;hb=ebaf3d71465066033d5882c61cdd2ec63b29d980;hp=b93b73958baede1e6e7e51560bed0386d4587a65;hpb=cbf0c2ef5f3fb7ea6367ac98f580337840952a05;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index b93b73958b..f71cb984b3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -31,6 +31,7 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -48,14 +49,17 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; @@ -94,10 +98,10 @@ public class RaftActorTest extends AbstractActorTest { InMemorySnapshotStore.clear(); } - public static class MockRaftActor extends RaftActor { + public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort { - protected DataPersistenceProvider dataPersistenceProvider; - private final RaftActor delegate; + private final RaftActor actorDelegate; + private final RaftActorRecoveryCohort cohortDelegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; @@ -134,11 +138,12 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); - this.delegate = mock(RaftActor.class); + this.actorDelegate = mock(RaftActor.class); + this.cohortDelegate = mock(RaftActorRecoveryCohort.class); if(dataPersistenceProvider == null){ - this.dataPersistenceProvider = new PersistentDataProvider(); + setPersistence(true); } else { - this.dataPersistenceProvider = dataPersistenceProvider; + setPersistence(dataPersistenceProvider); } } @@ -158,6 +163,16 @@ public class RaftActorTest extends AbstractActorTest { } } + + public void waitUntilLeader(){ + for(int i = 0;i < 10; i++){ + if(isLeader()){ + break; + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + public List getState() { return state; } @@ -177,27 +192,40 @@ public class RaftActorTest extends AbstractActorTest { return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier, + DataPersistenceProvider dataPersistenceProvider){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); + } + + @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { - delegate.applyState(clientActor, identifier, data); + actorDelegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called", persistenceId()); } @Override - protected void startLogRecoveryBatch(int maxBatchSize) { + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return this; } @Override - protected void appendRecoveredLogEntry(Payload data) { + public void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + public void appendRecoveredLogEntry(Payload data) { state.add(data); } @Override - protected void applyCurrentLogRecoveryBatch() { + public void applyCurrentLogRecoveryBatch() { } @Override protected void onRecoveryComplete() { - delegate.onRecoveryComplete(); + actorDelegate.onRecoveryComplete(); recoveryComplete.countDown(); } @@ -208,8 +236,8 @@ public class RaftActorTest extends AbstractActorTest { } @Override - protected void applyRecoverySnapshot(byte[] bytes) { - delegate.applyRecoverySnapshot(bytes); + public void applyRecoverySnapshot(byte[] bytes) { + cohortDelegate.applyRecoverySnapshot(bytes); try { Object data = toObject(bytes); if (data instanceof List) { @@ -222,21 +250,16 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void createSnapshot() { LOG.info("{}: createSnapshot called", persistenceId()); - delegate.createSnapshot(); + actorDelegate.createSnapshot(); } @Override protected void applySnapshot(byte [] snapshot) { LOG.info("{}: applySnapshot called", persistenceId()); - delegate.applySnapshot(snapshot); + actorDelegate.applySnapshot(snapshot); } @Override protected void onStateChanged() { - delegate.onStateChanged(); - } - - @Override - protected DataPersistenceProvider persistence() { - return this.dataPersistenceProvider; + actorDelegate.onStateChanged(); } @Override @@ -502,7 +525,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); + verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -524,7 +547,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("remove log entries", 1, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar")); assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); @@ -569,7 +592,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class)); + verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -589,7 +612,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("remove log entries", 0, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar")); assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); @@ -674,11 +697,13 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.waitUntilLeader(); + mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); - verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class)); } }; } @@ -702,9 +727,11 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.waitUntilLeader(); + mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); - verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); } @@ -766,7 +793,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + ImmutableMap.of("leader", "fake/path"), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -792,7 +819,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); - verify(mockRaftActor.delegate).createSnapshot(); + verify(mockRaftActor.actorDelegate).createSnapshot(); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -842,7 +869,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry)); - verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); + verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); } }; @@ -889,7 +916,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); - verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); + verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); @@ -949,7 +976,7 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testRaftRoleChangeNotifier() throws Exception { + public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception { new JavaTestKit(getSystem()) {{ TestActorRef notifierActor = factory.createTestActor( Props.create(MessageCollectorActor.class)); @@ -958,15 +985,17 @@ public class RaftActorTest extends AbstractActorTest { DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); long heartBeatInterval = 100; config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); - config.setElectionTimeoutFactor(1); + config.setElectionTimeoutFactor(20); String persistenceId = factory.generateActorId("notifier-"); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); + Collections.emptyMap(), Optional.of(config), notifierActor, + new NonPersistentDataProvider()), persistenceId); List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); + // check if the notifier got a role change from null to Follower RoleChanged raftRoleChanged = matches.get(0); assertEquals(persistenceId, raftRoleChanged.getMemberId()); @@ -1022,6 +1051,49 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); + MessageCollectorActor.waitUntilReady(notifierActor); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + long heartBeatInterval = 100; + config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(1); + + String persistenceId = factory.generateActorId("notifier-"); + + factory.createActor(MockRaftActor.props(persistenceId, + ImmutableMap.of("leader", "fake/path"), Optional.of(config), notifierActor), persistenceId); + + List matches = null; + for(int i = 0; i < 5000 / heartBeatInterval; i++) { + matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + assertNotNull(matches); + if(matches.size() == 3) { + break; + } + Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); + } + + assertEquals(2, matches.size()); + + // check if the notifier got a role change from null to Follower + RoleChanged raftRoleChanged = matches.get(0); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertNull(raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Follower to Candidate + raftRoleChanged = matches.get(1); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + }}; + } + @Test public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception { new JavaTestKit(getSystem()) { @@ -1068,7 +1140,7 @@ public class RaftActorTest extends AbstractActorTest { .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("x")), 4); - verify(leaderActor.delegate).createSnapshot(); + verify(leaderActor.actorDelegate).createSnapshot(); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1094,13 +1166,13 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider() + leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider() , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); + leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -1167,7 +1239,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("D")), 4); - verify(followerActor.delegate).createSnapshot(); + verify(followerActor.actorDelegate).createSnapshot(); assertEquals(6, followerActor.getReplicatedLog().size()); @@ -1204,7 +1276,7 @@ public class RaftActorTest extends AbstractActorTest { assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); + followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -1313,38 +1385,6 @@ public class RaftActorTest extends AbstractActorTest { }; } - - private static class NonPersistentProvider implements DataPersistenceProvider { - @Override - public boolean isRecoveryApplicable() { - return false; - } - - @Override - public void persist(T o, Procedure procedure) { - try { - procedure.apply(o); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void saveSnapshot(Object o) { - - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - - } - - @Override - public void deleteMessages(long sequenceNumber) { - - } - } - @Test public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { new JavaTestKit(getSystem()) {{ @@ -1354,7 +1394,7 @@ public class RaftActorTest extends AbstractActorTest { config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); config.setSnapshotBatchCount(5); - DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); Map peerAddresses = new HashMap<>(); @@ -1401,7 +1441,7 @@ public class RaftActorTest extends AbstractActorTest { config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); config.setSnapshotBatchCount(5); - DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); Map peerAddresses = new HashMap<>();