import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.AdditionalMatchers.aryEq;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
-import org.hamcrest.Description;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
-import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Mock
private DataPersistenceProvider mockPersistence;
-
@Mock
private RaftActorRecoveryCohort mockCohort;
@Mock
PersistentDataProvider mockPersistentProvider;
+ ActorRef mockActorRef;
+
+ ActorSystem mockActorSystem;
+
private RaftActorRecoverySupport support;
private RaftActorContext context;
private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
private final String localId = "leader";
-
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
-
- context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
- LOG), -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+ mockActorSystem = ActorSystem.create();
+ mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
+ context = new RaftActorContextImpl(mockActorRef, null, localId,
+ new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
+ Collections.<String, String>emptyMap(), configParams, mockPersistence, applyState -> {
+ }, LOG, MoreExecutors.directExecutor());
support = new RaftActorRecoverySupport(context, mockCohort);
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
}
- private void sendMessageToSupport(Object message) {
+ private void sendMessageToSupport(final Object message) {
sendMessageToSupport(message, false);
}
- private void sendMessageToSupport(Object message, boolean expComplete) {
+ private void sendMessageToSupport(final Object message, final boolean expComplete) {
boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
assertEquals("complete", expComplete, complete);
}
inOrder.verifyNoMoreInteractions();
}
+ @Test
+ public void testIncrementalRecovery() {
+ int recoverySnapshotInterval = 3;
+ int numberOfEntries = 5;
+ configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+ Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
+ context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
+
+ ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
+ ReplicatedLog replicatedLog = context.getReplicatedLog();
+
+ for (int i = 0; i <= numberOfEntries; i++) {
+ replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
+ new MockRaftActorContext.MockPayload(String.valueOf(i))));
+ }
+
+ AtomicInteger entryCount = new AtomicInteger();
+ ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
+ int run = entryCount.getAndIncrement();
+ LOG.info("Sending entry number {}", run);
+ sendMessageToSupport(new ApplyJournalEntries(run));
+ }, 0, 1, TimeUnit.SECONDS);
+
+ ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
+ numberOfEntries, TimeUnit.SECONDS);
+ try {
+ canceller.get();
+ verify(mockSnapshotConsumer, times(1)).accept(any());
+ applyEntriesExecutor.shutdown();
+ } catch (InterruptedException | ExecutionException e) {
+ Assert.fail();
+ }
+ }
+
@Test
public void testOnSnapshotOffer() {
replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
- byte[] snapshotBytes = {1,2,3,4,5};
-
ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
new MockRaftActorContext.MockPayload("4", 4));
long electionTerm = 2;
String electionVotedFor = "member-2";
- Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
- lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
+ MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
+ Snapshot snapshot = Snapshot.create(snapshotState,
+ Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
+ lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
- SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+ SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
sendMessageToSupport(snapshotOffer);
assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
- verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
+ verify(mockCohort).applyRecoverySnapshot(snapshotState);
}
@Test
@Test
public void testDataRecoveredWithPersistenceDisabled() {
- doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
+ doNothing().when(mockCohort).applyRecoverySnapshot(any());
doReturn(false).when(mockPersistence).isRecoveryApplicable();
doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
- Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+ Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
+ Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
sendMessageToSupport(snapshotOffer);
sendMessageToSupport(RecoveryCompleted.getInstance(), true);
- verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
+ verify(mockCohort, never()).applyRecoverySnapshot(any());
verify(mockCohort, never()).getRestoreFromSnapshot();
verifyNoMoreInteractions(mockCohort);
}
static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
- return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
- @Override
- public boolean matches(Object argument) {
- UpdateElectionTerm other = (UpdateElectionTerm) argument;
- return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendValue(new UpdateElectionTerm(term, votedFor));
- }
- });
+ return ArgumentMatchers.argThat(other ->
+ term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
}
@Test
long electionTerm = 2;
String electionVotedFor = "member-2";
ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
- new ServerInfo(localId, true),
- new ServerInfo("follower1", true),
- new ServerInfo("follower2", true)));
+ new ServerInfo(localId, true),
+ new ServerInfo("follower1", true),
+ new ServerInfo("follower2", true)));
- Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
+ MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
+ Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
-1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
- SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+ SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
sendMessageToSupport(snapshotOffer);
assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
- Sets.newHashSet(context.getPeerIds()));
+ Sets.newHashSet(context.getPeerIds()));
}
-}
+}
\ No newline at end of file