X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=14bfd1d348b69dc76332fc35ec8f8f94dd80e8db;hp=a3b070e2cc4e91a60b0d1a073010b3ed15955289;hb=68fb550b416dddd0a50e0110add0a4ae9b706758;hpb=2a31c2cacb9ad8f015a49708261ea93d256f0f60 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index a3b070e2cc..14bfd1d348 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -49,19 +49,21 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -96,10 +98,11 @@ public class RaftActorTest extends AbstractActorTest { InMemorySnapshotStore.clear(); } - public static class MockRaftActor extends RaftActor { + public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { - protected DataPersistenceProvider dataPersistenceProvider; - private final RaftActor delegate; + private final RaftActor actorDelegate; + private final RaftActorRecoveryCohort recoveryCohortDelegate; + private final RaftActorSnapshotCohort snapshotCohortDelegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; @@ -136,11 +139,13 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); - this.delegate = mock(RaftActor.class); + this.actorDelegate = mock(RaftActor.class); + this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); + this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); if(dataPersistenceProvider == null){ - this.dataPersistenceProvider = new PersistentDataProvider(); + setPersistence(true); } else { - this.dataPersistenceProvider = dataPersistenceProvider; + setPersistence(dataPersistenceProvider); } } @@ -197,26 +202,37 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { - delegate.applyState(clientActor, identifier, data); + actorDelegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called", persistenceId()); } @Override - protected void startLogRecoveryBatch(int maxBatchSize) { + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return this; + } + + @Override + protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + return this; } @Override - protected void appendRecoveredLogEntry(Payload data) { + public void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + public void appendRecoveredLogEntry(Payload data) { state.add(data); } @Override - protected void applyCurrentLogRecoveryBatch() { + public void applyCurrentLogRecoveryBatch() { } @Override protected void onRecoveryComplete() { - delegate.onRecoveryComplete(); + actorDelegate.onRecoveryComplete(); recoveryComplete.countDown(); } @@ -227,8 +243,8 @@ public class RaftActorTest extends AbstractActorTest { } @Override - protected void applyRecoverySnapshot(byte[] bytes) { - delegate.applyRecoverySnapshot(bytes); + public void applyRecoverySnapshot(byte[] bytes) { + recoveryCohortDelegate.applyRecoverySnapshot(bytes); try { Object data = toObject(bytes); if (data instanceof List) { @@ -239,23 +255,21 @@ public class RaftActorTest extends AbstractActorTest { } } - @Override protected void createSnapshot() { + @Override + public void createSnapshot(ActorRef actorRef) { LOG.info("{}: createSnapshot called", persistenceId()); - delegate.createSnapshot(); + snapshotCohortDelegate.createSnapshot(actorRef); } - @Override protected void applySnapshot(byte [] snapshot) { + @Override + public void applySnapshot(byte [] snapshot) { LOG.info("{}: applySnapshot called", persistenceId()); - delegate.applySnapshot(snapshot); - } - - @Override protected void onStateChanged() { - delegate.onStateChanged(); + snapshotCohortDelegate.applySnapshot(snapshot); } @Override - protected DataPersistenceProvider persistence() { - return this.dataPersistenceProvider; + protected void onStateChanged() { + actorDelegate.onStateChanged(); } @Override @@ -289,7 +303,6 @@ public class RaftActorTest extends AbstractActorTest { public ReplicatedLog getReplicatedLog(){ return this.getRaftActorContext().getReplicatedLog(); } - } @@ -404,11 +417,11 @@ public class RaftActorTest extends AbstractActorTest { // add more entries after snapshot is taken List entries = new ArrayList<>(); ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("F")); + new MockRaftActorContext.MockPayload("F", 2)); ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, - new MockRaftActorContext.MockPayload("G")); + new MockRaftActorContext.MockPayload("G", 3)); ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, - new MockRaftActorContext.MockPayload("H")); + new MockRaftActorContext.MockPayload("H", 4)); entries.add(entry2); entries.add(entry3); entries.add(entry4); @@ -438,6 +451,7 @@ public class RaftActorTest extends AbstractActorTest { RaftActorContext context = ref.underlyingActor().getRaftActorContext(); assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size()); + assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize()); assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); @@ -521,7 +535,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); + verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -543,7 +557,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("remove log entries", 1, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar")); assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); @@ -588,7 +602,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class)); + verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -608,7 +622,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("remove log entries", 0, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar")); assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); @@ -760,10 +774,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); - RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, -1, + new MockRaftActorContext.MockPayload("D")), -1); + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -792,12 +808,13 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(lastEntry); ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), @@ -809,9 +826,10 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); long replicatedToAllIndex = 1; - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); - verify(mockRaftActor.delegate).createSnapshot(); + mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); + + verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -861,7 +879,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry)); - verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); + verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); } }; @@ -908,7 +926,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); - verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); + verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); @@ -951,7 +969,9 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("D")), 1); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -981,7 +1001,7 @@ public class RaftActorTest extends AbstractActorTest { TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config), notifierActor, - new NonPersistentProvider()), persistenceId); + new NonPersistentDataProvider()), persistenceId); List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); @@ -1126,10 +1146,11 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + leaderActor.getRaftActorContext().getSnapshotManager() + .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("x")), 4); - leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); - verify(leaderActor.delegate).createSnapshot(); + verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1154,8 +1175,14 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider() + , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); + + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -1218,10 +1245,11 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + followerActor.getRaftActorContext().getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("D")), 4); - followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); - verify(followerActor.delegate).createSnapshot(); + verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); assertEquals(6, followerActor.getReplicatedLog().size()); @@ -1255,7 +1283,10 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -1353,7 +1384,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); @@ -1364,38 +1395,6 @@ public class RaftActorTest extends AbstractActorTest { }; } - - private static class NonPersistentProvider implements DataPersistenceProvider { - @Override - public boolean isRecoveryApplicable() { - return false; - } - - @Override - public void persist(T o, Procedure procedure) { - try { - procedure.apply(o); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void saveSnapshot(Object o) { - - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - - } - - @Override - public void deleteMessages(long sequenceNumber) { - - } - } - @Test public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { new JavaTestKit(getSystem()) {{ @@ -1405,7 +1404,7 @@ public class RaftActorTest extends AbstractActorTest { config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); config.setSnapshotBatchCount(5); - DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); Map peerAddresses = new HashMap<>(); @@ -1437,7 +1436,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(-1, leader.getReplicatedToAllIndex()); }}; @@ -1452,7 +1451,7 @@ public class RaftActorTest extends AbstractActorTest { config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); config.setSnapshotBatchCount(5); - DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); Map peerAddresses = new HashMap<>(); @@ -1480,7 +1479,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(3, leader.getReplicatedToAllIndex()); }};