X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=a0663efdd5919bcb21cab32407aed4460536105b;hp=cf7af439e5c5cb362975bdee0008b8941e74dd45;hb=03e752cbd625921ece92c5281cd4e1a8c81b3210;hpb=1462922f6fcf1d17a2b62cbfc6a9bc558fc6ae1c diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index cf7af439e5..a0663efdd5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1,5 +1,18 @@ package org.opendaylight.controller.cluster.raft; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; @@ -37,6 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; @@ -46,7 +60,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; -import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -62,25 +76,18 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - public class RaftActorTest extends AbstractActorTest { + private TestActorFactory factory; + + @Before + public void setUp(){ + factory = new TestActorFactory(getSystem()); + } @After - public void tearDown() { + public void tearDown() throws Exception { + factory.close(); MockAkkaJournal.clearJournal(); MockSnapshotStore.setMockSnapshot(null); } @@ -338,7 +345,7 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftActorRecovery() throws Exception { new JavaTestKit(getSystem()) {{ - String persistenceId = "follower10"; + String persistenceId = factory.generateActorId("follower-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); // Set the heartbeat interval high to essentially disable election otherwise the test @@ -346,8 +353,8 @@ public class RaftActorTest extends AbstractActorTest { // log entry. config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config)), persistenceId); + ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config)), persistenceId); watch(followerActor); @@ -359,15 +366,15 @@ public class RaftActorTest extends AbstractActorTest { int lastAppliedDuringSnapshotCapture = 3; int lastIndexDuringSnapshotCapture = 4; - // 4 messages as part of snapshot, which are applied to state - ByteString snapshotBytes = fromObject(Arrays.asList( + // 4 messages as part of snapshot, which are applied to state + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), - snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , + snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1); MockSnapshotStore.setMockSnapshot(snapshot); MockSnapshotStore.setPersistenceId(persistenceId); @@ -400,8 +407,8 @@ public class RaftActorTest extends AbstractActorTest { unwatch(followerActor); //reinstate the actor - TestActorRef ref = TestActorRef.create(getSystem(), - MockRaftActor.props(persistenceId, Collections.emptyMap(), + TestActorRef ref = factory.createTestActor( + MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config))); ref.underlyingActor().waitForRecoveryComplete(); @@ -427,28 +434,28 @@ public class RaftActorTest extends AbstractActorTest { public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config)), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config)), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); // Wait for akka's recovery to complete so it doesn't interfere. mockRaftActor.waitForRecoveryComplete(); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), - Lists.newArrayList(), 3, 1 ,3, 1); + Lists.newArrayList(), 3, 1, 3, 1); mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); @@ -481,8 +488,6 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - }}; } @@ -496,28 +501,28 @@ public class RaftActorTest extends AbstractActorTest { public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); // Wait for akka's recovery to complete so it doesn't interfere. mockRaftActor.waitForRecoveryComplete(); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), - Lists.newArrayList(), 3, 1 ,3, 1); + Lists.newArrayList(), 3, 1, 3, 1); mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); @@ -548,7 +553,6 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); }}; } @@ -557,7 +561,7 @@ public class RaftActorTest extends AbstractActorTest { public void testUpdatingElectionTermCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testUpdatingElectionTermCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -567,8 +571,8 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); dataPersistenceProviderMonitor.setPersistLatch(persistLatch); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -576,9 +580,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } + }; } @@ -586,7 +589,7 @@ public class RaftActorTest extends AbstractActorTest { public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -594,8 +597,8 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -605,9 +608,8 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } + }; } @@ -615,7 +617,7 @@ public class RaftActorTest extends AbstractActorTest { public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -623,8 +625,8 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -634,9 +636,8 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } + }; } @@ -644,7 +645,7 @@ public class RaftActorTest extends AbstractActorTest { public void testApplyLogEntriesCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testApplyLogEntriesCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -652,8 +653,8 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -661,9 +662,8 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } + }; } @@ -671,7 +671,7 @@ public class RaftActorTest extends AbstractActorTest { public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -679,19 +679,19 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), - MockRaftActor.props(persistenceId,Collections.emptyMap(), - Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, Collections.emptyMap(), + Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); @@ -701,8 +701,6 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).saveSnapshot(anyObject()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -711,7 +709,7 @@ public class RaftActorTest extends AbstractActorTest { public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -719,16 +717,16 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class))); ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), @@ -759,8 +757,6 @@ public class RaftActorTest extends AbstractActorTest { // Index 2 will not be in the log because it was removed due to snapshotting assertNull(mockRaftActor.getReplicatedLog().get(2)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -770,7 +766,7 @@ public class RaftActorTest extends AbstractActorTest { new JavaTestKit(getSystem()) { { - String persistenceId = "testApplyState"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -778,8 +774,8 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -790,8 +786,6 @@ public class RaftActorTest extends AbstractActorTest { verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -800,7 +794,7 @@ public class RaftActorTest extends AbstractActorTest { public void testApplySnapshot() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testApplySnapshot"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -808,24 +802,24 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog(); - oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class))); - oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class))); + oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); + oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); oldReplicatedLog.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, - mock(Payload.class))); + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + mock(Payload.class))); ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = mock(Snapshot.class); @@ -838,15 +832,13 @@ public class RaftActorTest extends AbstractActorTest { verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", - oldReplicatedLog != mockRaftActor.getReplicatedLog()); + oldReplicatedLog != mockRaftActor.getReplicatedLog()); assertEquals("lastApplied should be same as in the snapshot", - (Long) 3L, mockRaftActor.getLastApplied()); + (Long) 3L, mockRaftActor.getLastApplied()); assertEquals(0, mockRaftActor.getReplicatedLog().size()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -855,7 +847,7 @@ public class RaftActorTest extends AbstractActorTest { public void testSaveSnapshotFailure() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "testSaveSnapshotFailure"; + String persistenceId = factory.generateActorId("leader-"); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -863,12 +855,12 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), @@ -878,7 +870,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -888,8 +880,6 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Snapshot index should not have advanced because save snapshot failed", -1, mockRaftActor.getReplicatedLog().getSnapshotIndex()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -897,35 +887,35 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftRoleChangeNotifier() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); - String id = "testRaftRoleChangeNotifier"; + String persistenceId = factory.generateActorId("notifier-"); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id, - Collections.emptyMap(), Optional.of(config), notifierActor), id); + factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); // sleeping for a minimum of 2 seconds, if it spans more its fine. Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); - List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); assertNotNull(matches); assertEquals(3, matches.size()); // check if the notifier got a role change from null to Follower - RoleChanged raftRoleChanged = (RoleChanged) matches.get(0); - assertEquals(id, raftRoleChanged.getMemberId()); + RoleChanged raftRoleChanged = matches.get(0); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertNull(raftRoleChanged.getOldRole()); assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); // check if the notifier got a role change from Follower to Candidate - raftRoleChanged = (RoleChanged) matches.get(1); - assertEquals(id, raftRoleChanged.getMemberId()); + raftRoleChanged = matches.get(1); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); // check if the notifier got a role change from Candidate to Leader - raftRoleChanged = (RoleChanged) matches.get(2); - assertEquals(id, raftRoleChanged.getMemberId()); + raftRoleChanged = matches.get(2); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); }}; @@ -935,10 +925,11 @@ public class RaftActorTest extends AbstractActorTest { public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "leader1"; + String persistenceId = factory.generateActorId("leader-"); + String follower1Id = factory.generateActorId("follower-"); ActorRef followerActor1 = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); + factory.createActor(Props.create(MessageCollectorActor.class)); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -947,7 +938,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-1", followerActor1.path().toString()); + peerAddresses.put(follower1Id, followerActor1.path().toString()); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, peerAddresses, @@ -971,7 +962,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1)); + leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1)); leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(leaderActor.delegate).createSnapshot(); @@ -979,20 +970,20 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); //fake snapshot on index 5 - leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1)); assertEquals(8, leaderActor.getReplicatedLog().size()); //fake snapshot on index 6 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); - leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1)); assertEquals(8, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); assertEquals(8, leaderActor.getReplicatedLog().size()); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("foo-0"), new MockRaftActorContext.MockPayload("foo-1"), new MockRaftActorContext.MockPayload("foo-2"), @@ -1010,12 +1001,10 @@ public class RaftActorTest extends AbstractActorTest { new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied - leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1)); assertEquals(2, leaderActor.getReplicatedLog().size()); assertEquals(8, leaderActor.getReplicatedLog().lastIndex()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -1024,10 +1013,12 @@ public class RaftActorTest extends AbstractActorTest { public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "follower1"; + String persistenceId = factory.generateActorId("follower-"); + String leaderId = factory.generateActorId("leader-"); + ActorRef leaderActor1 = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); + factory.createActor(Props.create(MessageCollectorActor.class)); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -1036,7 +1027,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); Map peerAddresses = new HashMap<>(); - peerAddresses.put("leader", leaderActor1.path().toString()); + peerAddresses.put(leaderId, leaderActor1.path().toString()); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, peerAddresses, @@ -1061,7 +1052,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1)); + followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1)); followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(followerActor.delegate).createSnapshot(); @@ -1073,7 +1064,7 @@ public class RaftActorTest extends AbstractActorTest { (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("foo-6")) ); - followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5)); assertEquals(7, followerActor.getReplicatedLog().size()); //fake snapshot on index 7 @@ -1084,13 +1075,13 @@ public class RaftActorTest extends AbstractActorTest { (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("foo-7")) ); - followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6)); assertEquals(8, followerActor.getReplicatedLog().size()); assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("foo-0"), new MockRaftActorContext.MockPayload("foo-1"), new MockRaftActorContext.MockPayload("foo-2"), @@ -1109,13 +1100,11 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-7")) ); // send an additional entry 8 with leaderCommit = 7 - followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7)); // 7 and 8, as lastapplied is 7 assertEquals(2, followerActor.getReplicatedLog().size()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } @@ -1124,12 +1113,14 @@ public class RaftActorTest extends AbstractActorTest { public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception { new JavaTestKit(getSystem()) { { - String persistenceId = "leader1"; + String persistenceId = factory.generateActorId("leader-"); + String follower1Id = factory.generateActorId("follower-"); + String follower2Id = factory.generateActorId("follower-"); ActorRef followerActor1 = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); + factory.createActor(Props.create(MessageCollectorActor.class), follower1Id); ActorRef followerActor2 = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); + factory.createActor(Props.create(MessageCollectorActor.class), follower2Id); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -1138,10 +1129,10 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-1", followerActor1.path().toString()); - peerAddresses.put("follower-2", followerActor2.path().toString()); + peerAddresses.put(follower1Id, followerActor1.path().toString()); + peerAddresses.put(follower2Id, followerActor2.path().toString()); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), + TestActorRef mockActorRef = factory.createTestActor( MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config), dataPersistenceProvider), persistenceId); @@ -1163,23 +1154,26 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4); assertEquals(5, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); // set the 2nd follower nextIndex to 1 which has been snapshotted - leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); // simulate a real snapshot - leaderActor.onReceiveCommand(new InitiateInstallSnapshot()); + leaderActor.onReceiveCommand(new SendHeartBeat()); assertEquals(5, leaderActor.getReplicatedLog().size()); - assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ", + leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()) + , RaftState.Leader, leaderActor.getCurrentBehavior().state()); + //reply from a slow follower does not initiate a fake snapshot - leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1)); assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size()); - ByteString snapshotBytes = fromObject(Arrays.asList( + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("foo-0"), new MockRaftActorContext.MockPayload("foo-1"), new MockRaftActorContext.MockPayload("foo-2"), @@ -1191,17 +1185,12 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size()); //reply from a slow follower after should not raise errors - leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1)); assertEquals(0, leaderActor.getReplicatedLog().size()); - - mockActorRef.tell(PoisonPill.getInstance(), getRef()); - } }; } - - private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; @@ -1221,4 +1210,5 @@ public class RaftActorTest extends AbstractActorTest { } } } + }