X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=9e0e06c70bb694fdbd3cfe6b654877023e27a361;hp=fd9912244a620c6bc060cb465dd0da51449ce47d;hb=bd8beb1bfee9f421ad8f2d07b1424b21038234a2;hpb=3def254d3e8d2f24038ddfb7d1b1749ca2135fe2 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index fd9912244a..9e0e06c70b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1,52 +1,80 @@ package org.opendaylight.controller.cluster.raft; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; -import akka.event.Logging; import akka.japi.Creator; +import akka.japi.Procedure; +import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.Follower; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.mockito.Mockito.mock; - public class RaftActorTest extends AbstractActorTest { @@ -59,36 +87,44 @@ public class RaftActorTest extends AbstractActorTest { public static class MockRaftActor extends RaftActor { private final DataPersistenceProvider dataPersistenceProvider; + private final RaftActor delegate; + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final List state; + private ActorRef roleChangeNotifier; + private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); public static final class MockRaftActorCreator implements Creator { + private static final long serialVersionUID = 1L; private final Map peerAddresses; private final String id; private final Optional config; private final DataPersistenceProvider dataPersistenceProvider; + private final ActorRef roleChangeNotifier; private MockRaftActorCreator(Map peerAddresses, String id, - Optional config, DataPersistenceProvider dataPersistenceProvider) { + Optional config, DataPersistenceProvider dataPersistenceProvider, + ActorRef roleChangeNotifier) { this.peerAddresses = peerAddresses; this.id = id; this.config = config; this.dataPersistenceProvider = dataPersistenceProvider; + this.roleChangeNotifier = roleChangeNotifier; } @Override public MockRaftActor create() throws Exception { - return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider); + MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config, + dataPersistenceProvider); + mockRaftActor.roleChangeNotifier = this.roleChangeNotifier; + return mockRaftActor; } } - private final CountDownLatch recoveryComplete = new CountDownLatch(1); - private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1); - private final CountDownLatch applyStateLatch = new CountDownLatch(1); - - private final List state; - - public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { + public MockRaftActor(String id, Map peerAddresses, Optional config, + DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); + this.delegate = mock(RaftActor.class); if(dataPersistenceProvider == null){ this.dataPersistenceProvider = new PersistentDataProvider(); } else { @@ -104,8 +140,12 @@ public class RaftActorTest extends AbstractActorTest { } } - public CountDownLatch getApplyRecoverySnapshotLatch(){ - return applyRecoverySnapshot; + public void waitForInitializeBehaviorComplete() { + try { + assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } } public List getState() { @@ -114,17 +154,22 @@ public class RaftActorTest extends AbstractActorTest { public static Props props(final String id, final Map peerAddresses, Optional config){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null)); } public static Props props(final String id, final Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); + } @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { - applyStateLatch.countDown(); + delegate.applyState(clientActor, identifier, data); + LOG.info("applyState called"); } @Override @@ -142,17 +187,23 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void onRecoveryComplete() { + delegate.onRecoveryComplete(); recoveryComplete.countDown(); } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { - applyRecoverySnapshot.countDown(); + protected void initializeBehavior() { + super.initializeBehavior(); + initializeBehaviorComplete.countDown(); + } + + @Override + protected void applyRecoverySnapshot(byte[] bytes) { + delegate.applyRecoverySnapshot(bytes); try { - Object data = toObject(snapshot); - System.out.println("!!!!!applyRecoverySnapshot: "+data); + Object data = toObject(bytes); if (data instanceof List) { - state.addAll((List) data); + state.addAll((List) data); } } catch (Exception e) { e.printStackTrace(); @@ -160,12 +211,15 @@ public class RaftActorTest extends AbstractActorTest { } @Override protected void createSnapshot() { + delegate.createSnapshot(); } - @Override protected void applySnapshot(ByteString snapshot) { + @Override protected void applySnapshot(byte [] snapshot) { + delegate.applySnapshot(snapshot); } @Override protected void onStateChanged() { + delegate.onStateChanged(); } @Override @@ -173,16 +227,21 @@ public class RaftActorTest extends AbstractActorTest { return this.dataPersistenceProvider; } + @Override + protected Optional getRoleChangeNotifier() { + return Optional.fromNullable(roleChangeNotifier); + } + @Override public String persistenceId() { return this.getId(); } - private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; try { - bis = new ByteArrayInputStream(bs.toByteArray()); + bis = new ByteArrayInputStream(bs); ois = new ObjectInputStream(bis); obj = ois.readObject(); } finally { @@ -210,51 +269,69 @@ public class RaftActorTest extends AbstractActorTest { super(actorSystem); raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName, - Collections.EMPTY_MAP, Optional.absent()), actorName); + Collections.emptyMap(), Optional.absent()), actorName); } - public boolean waitForStartup(){ + public ActorRef getRaftActor() { + return raftActor; + } + + public boolean waitForLogMessage(final Class logEventClass, String message){ // Wait for a specific log message to show up return - new JavaTestKit.EventFilter(Logging.Info.class + new JavaTestKit.EventFilter(logEventClass ) { @Override protected Boolean run() { return true; } }.from(raftActor.path().toString()) - .message("Switching from behavior Candidate to Leader") + .message(message) .occurrences(1).exec(); } - public void findLeader(final String expectedLeader){ - raftActor.tell(new FindLeader(), getRef()); - - FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor()); + protected void waitUntilLeader(){ + waitUntilLeader(raftActor); } - public ActorRef getRaftActor() { - return raftActor; + protected void waitUntilLeader(ActorRef actorRef) { + FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); + for(int i = 0; i < 20 * 5; i++) { + Future future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration)); + try { + FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration); + if(resp.getLeaderActor() != null) { + return; + } + } catch(TimeoutException e) { + } catch(Exception e) { + System.err.println("FindLeader threw ex"); + e.printStackTrace(); + } + + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Leader not found for actorRef " + actorRef.path()); } + } @Test public void testConstruction() { - boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup(); - assertEquals(true, started); + new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader(); } @Test public void testFindLeaderWhenLeaderIsSelf(){ RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader"); - kit.waitForStartup(); - kit.findLeader(kit.getRaftActor().path().toString()); + kit.waitUntilLeader(); } @Test @@ -269,7 +346,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config)), persistenceId); + Collections.emptyMap(), Optional.of(config)), persistenceId); watch(followerActor); @@ -283,10 +360,10 @@ public class RaftActorTest extends AbstractActorTest { // 4 messages as part of snapshot, which are applied to state ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , @@ -323,7 +400,7 @@ public class RaftActorTest extends AbstractActorTest { //reinstate the actor TestActorRef ref = TestActorRef.create(getSystem(), - MockRaftActor.props(persistenceId, Collections.EMPTY_MAP, + MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config))); ref.underlyingActor().waitForRecoveryComplete(); @@ -356,7 +433,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config)), persistenceId); + Collections.emptyMap(), Optional.of(config)), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -374,9 +451,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch(); - - assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS)); + verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -427,7 +502,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); + Collections.emptyMap(), Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -445,9 +520,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch(); - - assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS)); + verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -494,7 +567,7 @@ public class RaftActorTest extends AbstractActorTest { dataPersistenceProviderMonitor.setPersistLatch(persistLatch); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -518,18 +591,18 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - CountDownLatch persistLatch = new CountDownLatch(1); - DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class))); + MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)); - assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); + mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry); + + verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class)); mockActorRef.tell(PoisonPill.getInstance(), getRef()); @@ -547,12 +620,10 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - CountDownLatch persistLatch = new CountDownLatch(2); - DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -560,7 +631,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); - assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); + verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); mockActorRef.tell(PoisonPill.getInstance(), getRef()); @@ -578,18 +649,16 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - CountDownLatch persistLatch = new CountDownLatch(1); - DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); mockRaftActor.onReceiveCommand(new ApplyLogEntries(10)); - assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); + verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); mockActorRef.tell(PoisonPill.getInstance(), getRef()); @@ -607,12 +676,11 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - CountDownLatch persistLatch = new CountDownLatch(1); - DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch); + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId,Collections.emptyMap(), + Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -624,9 +692,13 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + + verify(dataPersistenceProvider).saveSnapshot(anyObject()); mockActorRef.tell(PoisonPill.getInstance(), getRef()); @@ -644,14 +716,154 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - CountDownLatch deleteMessagesLatch = new CountDownLatch(1); - CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1); + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class))); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); + + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1)); + + verify(mockRaftActor.delegate).createSnapshot(); + + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + + mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100))); + + verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + + verify(dataPersistenceProvider).deleteMessages(100); + + assertEquals(2, mockRaftActor.getReplicatedLog().size()); + + assertNotNull(mockRaftActor.getReplicatedLog().get(3)); + assertNotNull(mockRaftActor.getReplicatedLog().get(4)); + + // Index 2 will not be in the log because it was removed due to snapshotting + assertNull(mockRaftActor.getReplicatedLog().get(2)); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testApplyState() throws Exception { + + new JavaTestKit(getSystem()) { + { + String persistenceId = "testApplyState"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("F")); + + mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry)); + + verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testApplySnapshot() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testApplySnapshot"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog(); + + oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class))); + oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class))); + oldReplicatedLog.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + mock(Payload.class))); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + + Snapshot snapshot = mock(Snapshot.class); + + doReturn(snapshotBytes.toByteArray()).when(snapshot).getState(); + + doReturn(3L).when(snapshot).getLastAppliedIndex(); + + mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); + + verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); + + assertTrue("The replicatedLog should have changed", + oldReplicatedLog != mockRaftActor.getReplicatedLog()); + + assertEquals("lastApplied should be same as in the snapshot", + (Long) 3L, mockRaftActor.getLastApplied()); + + assertEquals(0, mockRaftActor.getReplicatedLog().size()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testSaveSnapshotFailure() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testSaveSnapshotFailure"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); - dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch); - dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -661,15 +873,245 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100))); + mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L), + new Exception())); + + assertEquals("Snapshot index should not have advanced because save snapshot failed", -1, + mockRaftActor.getReplicatedLog().getSnapshotIndex()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testRaftRoleChangeNotifier() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + String id = "testRaftRoleChangeNotifier"; + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id, + Collections.emptyMap(), Optional.of(config), notifierActor), id); + + // sleeping for a minimum of 2 seconds, if it spans more its fine. + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + + List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + assertNotNull(matches); + assertEquals(3, matches.size()); + + // check if the notifier got a role change from null to Follower + RoleChanged raftRoleChanged = (RoleChanged) matches.get(0); + assertEquals(id, raftRoleChanged.getMemberId()); + assertNull(raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Follower to Candidate + raftRoleChanged = (RoleChanged) matches.get(1); + assertEquals(id, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Candidate to Leader + raftRoleChanged = (RoleChanged) matches.get(2); + assertEquals(id, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + }}; + } + + @Test + public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "leader1"; + + ActorRef followerActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(4); + leaderActor.getRaftActorContext().setLastApplied(4); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); - assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS)); + leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1)); + leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(leaderActor.delegate).createSnapshot(); - assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS)); + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + //fake snapshot on index 5 + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1)); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1)); + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, leaderActor.getReplicatedLog().size()); + assertEquals(7, leaderActor.getReplicatedLog().lastIndex()); + + // add another non-replicated entry + leaderActor.getReplicatedLog().append( + new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); + + //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1)); + assertEquals(2, leaderActor.getReplicatedLog().size()); + assertEquals(8, leaderActor.getReplicatedLog().lastIndex()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "follower1"; + + ActorRef leaderActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("leader", leaderActor1.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor followerActor = mockActorRef.underlyingActor(); + followerActor.getRaftActorContext().setCommitIndex(4); + followerActor.getRaftActorContext().setLastApplied(4); + followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + followerActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + Follower follower = new Follower(followerActor.getRaftActorContext()); + followerActor.setCurrentBehavior(follower); + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); + + // log as indices 0-5 + assertEquals(6, followerActor.getReplicatedLog().size()); + + //snapshot on 4 + followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1)); + followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(followerActor.delegate).createSnapshot(); + + assertEquals(6, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + List entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("foo-6")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5)); + assertEquals(7, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 7 + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, + new MockRaftActorContext.MockPayload("foo-7")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6)); + assertEquals(8, followerActor.getReplicatedLog().size()); + + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log + assertEquals(7, followerActor.getReplicatedLog().lastIndex()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8, + new MockRaftActorContext.MockPayload("foo-7")) + ); + // send an additional entry 8 with leaderCommit = 7 + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7)); + + // 7 and 8, as lastapplied is 7 + assertEquals(2, followerActor.getReplicatedLog().size()); mockActorRef.tell(PoisonPill.getInstance(), getRef()); @@ -677,6 +1119,91 @@ public class RaftActorTest extends AbstractActorTest { }; } + @Test + public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "leader1"; + + ActorRef followerActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef followerActor2 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + peerAddresses.put("follower-2", followerActor2.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(9); + leaderActor.getRaftActorContext().setLastApplied(9); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // create 5 entries in the log + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build()); + //set the snapshot index to 4 , 0 to 4 are snapshotted + leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4); + assertEquals(5, leaderActor.getReplicatedLog().size()); + + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1)); + assertEquals(5, leaderActor.getReplicatedLog().size()); + + // set the 2nd follower nextIndex to 1 which has been snapshotted + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1)); + assertEquals(5, leaderActor.getReplicatedLog().size()); + + // simulate a real snapshot + leaderActor.onReceiveCommand(new InitiateInstallSnapshot()); + assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ", + leaderActor.getCurrentBehavior().state(),leaderActor.getLeaderId()) + , RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + + //reply from a slow follower does not initiate a fake snapshot + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1)); + assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size()); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size()); + + //reply from a slow follower after should not raise errors + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1)); + assertEquals(0, leaderActor.getReplicatedLog().size()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null;