X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorRecoverySupportTest.java;h=3e66c708dd25dc411979b0d238650624199b59b0;hp=4855f42931da658d3d6f7912edb930fd78498bf8;hb=30eb37d8cde5a8d35e36aa38e0bab1232b242de2;hpb=7daddd72031b33ed686abe18a0813e41263aac8d diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index 4855f42931..3e66c708dd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -12,26 +12,43 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotOffer; -import com.google.common.collect.Sets; +import akka.testkit.javadsl.TestKit; import com.google.common.util.concurrent.MoreExecutors; -import java.util.Arrays; -import java.util.Collections; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentMatchers; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; @@ -44,6 +61,7 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,37 +70,37 @@ import org.slf4j.LoggerFactory; * * @author Thomas Pantelis */ +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class RaftActorRecoverySupportTest { - private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class); @Mock private DataPersistenceProvider mockPersistence; - @Mock private RaftActorRecoveryCohort mockCohort; - @Mock - private RaftActorSnapshotCohort mockSnapshotCohort; - @Mock PersistentDataProvider mockPersistentProvider; + ActorRef mockActorRef; + + ActorSystem mockActorSystem; + private RaftActorRecoverySupport support; private RaftActorContext context; private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); private final String localId = "leader"; - @Before public void setup() { - MockitoAnnotations.initMocks(this); - - context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", - LOG), -1, -1, Collections.emptyMap(), configParams, - mockPersistence, applyState -> { }, LOG, MoreExecutors.directExecutor()); + mockActorSystem = ActorSystem.create(); + mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class)); + context = new RaftActorContextImpl(mockActorRef, null, localId, + new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1, + Map.of(), configParams, mockPersistence, applyState -> { + }, LOG, MoreExecutors.directExecutor()); support = new RaftActorRecoverySupport(context, mockCohort); @@ -91,6 +109,11 @@ public class RaftActorRecoverySupportTest { context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); } + @After + public void tearDown() { + TestKit.shutdownActorSystem(mockActorSystem); + } + private void sendMessageToSupport(final Object message) { sendMessageToSupport(message, false); } @@ -159,6 +182,40 @@ public class RaftActorRecoverySupportTest { inOrder.verifyNoMoreInteractions(); } + @Test + public void testIncrementalRecovery() { + int recoverySnapshotInterval = 3; + int numberOfEntries = 5; + configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval); + Consumer> mockSnapshotConsumer = mock(Consumer.class); + context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer); + + ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor(); + ReplicatedLog replicatedLog = context.getReplicatedLog(); + + for (int i = 0; i <= numberOfEntries; i++) { + replicatedLog.append(new SimpleReplicatedLogEntry(i, 1, + new MockRaftActorContext.MockPayload(String.valueOf(i)))); + } + + AtomicInteger entryCount = new AtomicInteger(); + ScheduledFuture applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> { + int run = entryCount.getAndIncrement(); + LOG.info("Sending entry number {}", run); + sendMessageToSupport(new ApplyJournalEntries(run)); + }, 0, 1, TimeUnit.SECONDS); + + ScheduledFuture canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false), + numberOfEntries, TimeUnit.SECONDS); + try { + canceller.get(); + verify(mockSnapshotConsumer, times(1)).accept(any()); + applyEntriesExecutor.shutdown(); + } catch (InterruptedException | ExecutionException e) { + Assert.fail(); + } + } + @Test public void testOnSnapshotOffer() { @@ -178,13 +235,13 @@ public class RaftActorRecoverySupportTest { long electionTerm = 2; String electionVotedFor = "member-2"; - MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1"))); + MockSnapshotState snapshotState = new MockSnapshotState(List.of(new MockPayload("1"))); Snapshot snapshot = Snapshot.create(snapshotState, - Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1, + List.of(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null); SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345); - SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot); + SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot); sendMessageToSupport(snapshotOffer); @@ -259,12 +316,11 @@ public class RaftActorRecoverySupportTest { @Test public void testDataRecoveredWithPersistenceDisabled() { - doNothing().when(mockCohort).applyRecoverySnapshot(any()); doReturn(false).when(mockPersistence).isRecoveryApplicable(); doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber(); - Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))), - Collections.emptyList(), 3, 1, 3, 1, -1, null, null); + Snapshot snapshot = Snapshot.create(new MockSnapshotState(List.of(new MockPayload("1"))), + List.of(), 3, 1, 3, 1, -1, null, null); SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot); sendMessageToSupport(snapshotOffer); @@ -298,14 +354,12 @@ public class RaftActorRecoverySupportTest { } static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) { - return ArgumentMatchers.argThat( - other -> term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor())); + return ArgumentMatchers.argThat(other -> + term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor())); } @Test public void testNoDataRecoveredWithPersistenceDisabled() { - doReturn(false).when(mockPersistence).isRecoveryApplicable(); - sendMessageToSupport(new UpdateElectionTerm(5, "member2")); assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm()); @@ -327,7 +381,7 @@ public class RaftActorRecoverySupportTest { context.addToPeers(follower2, null, VotingState.VOTING); //add new Server - ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList( + ServerConfigurationPayload obj = new ServerConfigurationPayload(List.of( new ServerInfo(localId, true), new ServerInfo(follower1, true), new ServerInfo(follower2, false), @@ -337,8 +391,7 @@ public class RaftActorRecoverySupportTest { //verify new peers assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); - assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3), - Sets.newHashSet(context.getPeerIds())); + assertEquals("New peer Ids", Set.of(follower1, follower2, follower3), Set.copyOf(context.getPeerIds())); assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting()); assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting()); assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting()); @@ -349,7 +402,7 @@ public class RaftActorRecoverySupportTest { verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class)); //remove existing follower1 - obj = new ServerConfigurationPayload(Arrays.asList( + obj = new ServerConfigurationPayload(List.of( new ServerInfo(localId, true), new ServerInfo("follower2", true), new ServerInfo("follower3", true))); @@ -358,7 +411,7 @@ public class RaftActorRecoverySupportTest { //verify new peers assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); - assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds())); + assertEquals("New peer Ids", Set.of(follower2, follower3), Set.copyOf(context.getPeerIds())); } @Test @@ -366,30 +419,30 @@ public class RaftActorRecoverySupportTest { doReturn(false).when(mockPersistence).isRecoveryApplicable(); String follower = "follower"; - ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList( + ServerConfigurationPayload obj = new ServerConfigurationPayload(List.of( new ServerInfo(localId, true), new ServerInfo(follower, true))); sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj)); //verify new peers - assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds())); + assertEquals("New peer Ids", Set.of(follower), Set.copyOf(context.getPeerIds())); } @Test public void testOnSnapshotOfferWithServerConfiguration() { long electionTerm = 2; String electionVotedFor = "member-2"; - ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList( - new ServerInfo(localId, true), - new ServerInfo("follower1", true), - new ServerInfo("follower2", true))); + ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(List.of( + new ServerInfo(localId, true), + new ServerInfo("follower1", true), + new ServerInfo("follower2", true))); - MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1"))); - Snapshot snapshot = Snapshot.create(snapshotState, Collections.emptyList(), + MockSnapshotState snapshotState = new MockSnapshotState(List.of(new MockPayload("1"))); + Snapshot snapshot = Snapshot.create(snapshotState, List.of(), -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload); SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345); - SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot); + SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot); sendMessageToSupport(snapshotOffer); @@ -397,7 +450,6 @@ public class RaftActorRecoverySupportTest { assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm()); assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor()); assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); - assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"), - Sets.newHashSet(context.getPeerIds())); + assertEquals("Peer List", Set.of("follower1", "follower2"), Set.copyOf(context.getPeerIds())); } -} +} \ No newline at end of file