X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=cf7af439e5c5cb362975bdee0008b8941e74dd45;hp=d999bb2ba1de79e59d65b5cd78614bc8b3fc2c7c;hb=9b43c3ce7c46eaae9060f3ed18e6c39f76ede2e5;hpb=800a4b8dda7df2d3ee0caa5d2d7d130bb94efa52 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index d999bb2ba1..cf7af439e5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -29,6 +29,7 @@ import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -45,10 +46,13 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; @@ -57,7 +61,9 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -86,6 +92,7 @@ public class RaftActorTest extends AbstractActorTest { private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; + private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); public static final class MockRaftActorCreator implements Creator { private static final long serialVersionUID = 1L; @@ -114,7 +121,8 @@ public class RaftActorTest extends AbstractActorTest { } } - public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { + public MockRaftActor(String id, Map peerAddresses, Optional config, + DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); this.delegate = mock(RaftActor.class); @@ -133,6 +141,14 @@ public class RaftActorTest extends AbstractActorTest { } } + public void waitForInitializeBehaviorComplete() { + try { + assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + public List getState() { return state; } @@ -177,11 +193,16 @@ public class RaftActorTest extends AbstractActorTest { } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { - delegate.applyRecoverySnapshot(snapshot); + protected void initializeBehavior() { + super.initializeBehavior(); + initializeBehaviorComplete.countDown(); + } + + @Override + protected void applyRecoverySnapshot(byte[] bytes) { + delegate.applyRecoverySnapshot(bytes); try { - Object data = toObject(snapshot); - System.out.println("!!!!!applyRecoverySnapshot: "+data); + Object data = toObject(bytes); if (data instanceof List) { state.addAll((List) data); } @@ -194,7 +215,7 @@ public class RaftActorTest extends AbstractActorTest { delegate.createSnapshot(); } - @Override protected void applySnapshot(ByteString snapshot) { + @Override protected void applySnapshot(byte [] snapshot) { delegate.applySnapshot(snapshot); } @@ -216,12 +237,12 @@ public class RaftActorTest extends AbstractActorTest { return this.getId(); } - private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; try { - bis = new ByteArrayInputStream(bs.toByteArray()); + bis = new ByteArrayInputStream(bs); ois = new ObjectInputStream(bis); obj = ois.readObject(); } finally { @@ -340,10 +361,10 @@ public class RaftActorTest extends AbstractActorTest { // 4 messages as part of snapshot, which are applied to state ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , @@ -431,7 +452,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes)); + verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -500,7 +521,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class)); + verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -676,7 +697,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); verify(dataPersistenceProvider).saveSnapshot(anyObject()); @@ -722,7 +743,7 @@ public class RaftActorTest extends AbstractActorTest { verify(mockRaftActor.delegate).createSnapshot(); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100))); @@ -814,7 +835,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); - verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes)); + verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); @@ -859,7 +880,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L), new Exception())); @@ -910,6 +931,277 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "leader1"; + + ActorRef followerActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(4); + leaderActor.getRaftActorContext().setLastApplied(4); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1)); + leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(leaderActor.delegate).createSnapshot(); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + //fake snapshot on index 5 + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1)); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1)); + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, leaderActor.getReplicatedLog().size()); + assertEquals(7, leaderActor.getReplicatedLog().lastIndex()); + + // add another non-replicated entry + leaderActor.getReplicatedLog().append( + new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); + + //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1)); + assertEquals(2, leaderActor.getReplicatedLog().size()); + assertEquals(8, leaderActor.getReplicatedLog().lastIndex()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "follower1"; + + ActorRef leaderActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("leader", leaderActor1.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor followerActor = mockActorRef.underlyingActor(); + followerActor.getRaftActorContext().setCommitIndex(4); + followerActor.getRaftActorContext().setLastApplied(4); + followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + followerActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + Follower follower = new Follower(followerActor.getRaftActorContext()); + followerActor.setCurrentBehavior(follower); + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); + + // log as indices 0-5 + assertEquals(6, followerActor.getReplicatedLog().size()); + + //snapshot on 4 + followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1)); + followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(followerActor.delegate).createSnapshot(); + + assertEquals(6, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + List entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("foo-6")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5)); + assertEquals(7, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 7 + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, + new MockRaftActorContext.MockPayload("foo-7")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6)); + assertEquals(8, followerActor.getReplicatedLog().size()); + + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log + assertEquals(7, followerActor.getReplicatedLog().lastIndex()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8, + new MockRaftActorContext.MockPayload("foo-7")) + ); + // send an additional entry 8 with leaderCommit = 7 + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7)); + + // 7 and 8, as lastapplied is 7 + assertEquals(2, followerActor.getReplicatedLog().size()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "leader1"; + + ActorRef followerActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef followerActor2 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + peerAddresses.put("follower-2", followerActor2.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(9); + leaderActor.getRaftActorContext().setLastApplied(9); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // create 5 entries in the log + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build()); + //set the snapshot index to 4 , 0 to 4 are snapshotted + leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4); + assertEquals(5, leaderActor.getReplicatedLog().size()); + + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1)); + assertEquals(5, leaderActor.getReplicatedLog().size()); + + // set the 2nd follower nextIndex to 1 which has been snapshotted + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1)); + assertEquals(5, leaderActor.getReplicatedLog().size()); + + // simulate a real snapshot + leaderActor.onReceiveCommand(new InitiateInstallSnapshot()); + assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + //reply from a slow follower does not initiate a fake snapshot + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1)); + assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size()); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size()); + + //reply from a slow follower after should not raise errors + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1)); + assertEquals(0, leaderActor.getReplicatedLog().size()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null;