X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorRecoverySupportTest.java;h=0d29c31ee4b3b1d5f9e879c3a014317fa188e850;hp=4855f42931da658d3d6f7912edb930fd78498bf8;hb=33877f41ffc3f8eb36ad8490315419b90817d26e;hpb=4887ec8e6bfe82072caed8612573209e349a3e2b diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index 4855f42931..0d29c31ee4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -14,17 +14,32 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotOffer; import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; +import java.io.OutputStream; import java.util.Arrays; import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatchers; @@ -44,6 +59,7 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,30 +75,31 @@ public class RaftActorRecoverySupportTest { @Mock private DataPersistenceProvider mockPersistence; - @Mock private RaftActorRecoveryCohort mockCohort; - @Mock - private RaftActorSnapshotCohort mockSnapshotCohort; - @Mock PersistentDataProvider mockPersistentProvider; + ActorRef mockActorRef; + + ActorSystem mockActorSystem; + private RaftActorRecoverySupport support; private RaftActorContext context; private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); private final String localId = "leader"; - @Before public void setup() { MockitoAnnotations.initMocks(this); - - context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", - LOG), -1, -1, Collections.emptyMap(), configParams, - mockPersistence, applyState -> { }, LOG, MoreExecutors.directExecutor()); + mockActorSystem = ActorSystem.create(); + mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class)); + context = new RaftActorContextImpl(mockActorRef, null, localId, + new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1, + Collections.emptyMap(), configParams, mockPersistence, applyState -> { + }, LOG, MoreExecutors.directExecutor()); support = new RaftActorRecoverySupport(context, mockCohort); @@ -159,6 +176,40 @@ public class RaftActorRecoverySupportTest { inOrder.verifyNoMoreInteractions(); } + @Test + public void testIncrementalRecovery() { + int recoverySnapshotInterval = 3; + int numberOfEntries = 5; + configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval); + Consumer> mockSnapshotConsumer = mock(Consumer.class); + context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer); + + ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor(); + ReplicatedLog replicatedLog = context.getReplicatedLog(); + + for (int i = 0; i <= numberOfEntries; i++) { + replicatedLog.append(new SimpleReplicatedLogEntry(i, 1, + new MockRaftActorContext.MockPayload(String.valueOf(i)))); + } + + AtomicInteger entryCount = new AtomicInteger(); + ScheduledFuture applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> { + int run = entryCount.getAndIncrement(); + LOG.info("Sending entry number {}", run); + sendMessageToSupport(new ApplyJournalEntries(run)); + }, 0, 1, TimeUnit.SECONDS); + + ScheduledFuture canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false), + numberOfEntries, TimeUnit.SECONDS); + try { + canceller.get(); + verify(mockSnapshotConsumer, times(1)).accept(any()); + applyEntriesExecutor.shutdown(); + } catch (InterruptedException | ExecutionException e) { + Assert.fail(); + } + } + @Test public void testOnSnapshotOffer() { @@ -184,7 +235,7 @@ public class RaftActorRecoverySupportTest { lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null); SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345); - SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot); + SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot); sendMessageToSupport(snapshotOffer); @@ -298,8 +349,8 @@ public class RaftActorRecoverySupportTest { } static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) { - return ArgumentMatchers.argThat( - other -> term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor())); + return ArgumentMatchers.argThat(other -> + term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor())); } @Test @@ -380,16 +431,16 @@ public class RaftActorRecoverySupportTest { long electionTerm = 2; String electionVotedFor = "member-2"; ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList( - new ServerInfo(localId, true), - new ServerInfo("follower1", true), - new ServerInfo("follower2", true))); + new ServerInfo(localId, true), + new ServerInfo("follower1", true), + new ServerInfo("follower2", true))); MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1"))); Snapshot snapshot = Snapshot.create(snapshotState, Collections.emptyList(), -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload); SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345); - SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot); + SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot); sendMessageToSupport(snapshotOffer); @@ -398,6 +449,6 @@ public class RaftActorRecoverySupportTest { assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor()); assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"), - Sets.newHashSet(context.getPeerIds())); + Sets.newHashSet(context.getPeerIds())); } -} +} \ No newline at end of file