package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collections;
+import org.hamcrest.Description;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
+import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Mock
private RaftActorRecoveryCohort mockCohort;
+ @Mock
+ PersistentDataProvider mockPersistentProvider;
+
private RaftActorRecoverySupport support;
private RaftActorContext context;
private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ private final String localId = "leader";
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
- context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG),
- -1, -1, Collections.<String,String>emptyMap(), configParams, LOG);
+ context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
+ -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
- support = new RaftActorRecoverySupport(mockPersistence, context , mockBehavior, mockCohort);
+ support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
doReturn(true).when(mockPersistence).isRecoveryApplicable();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
}
private void sendMessageToSupport(Object message) {
}
private void sendMessageToSupport(Object message, boolean expComplete) {
- boolean complete = support.handleRecoveryMessage(message);
+ boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
assertEquals("complete", expComplete, complete);
}
ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
5, new MockRaftActorContext.MockPayload("5", 5));
- int lastAppliedDuringSnapshotCapture = 3;
- int lastIndexDuringSnapshotCapture = 5;
+ long lastAppliedDuringSnapshotCapture = 3;
+ long lastIndexDuringSnapshotCapture = 5;
+ long electionTerm = 2;
+ String electionVotedFor = "member-2";
Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
- lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+ lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+ assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+ assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
}
}
inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
-
+ inOrder.verify(mockCohort).getRestoreFromSnapshot();
inOrder.verifyNoMoreInteractions();
}
public void testOnRecoveryCompletedWithNoRemainingBatch() {
sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+ verify(mockCohort).getRestoreFromSnapshot();
verifyNoMoreInteractions(mockCohort);
}
}
@Test
- public void testRecoveryWithPersistenceDisabled() {
+ public void testDeprecatedUpdateElectionTerm() {
+
+ sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
+
+ assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
+ assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testDataRecoveredWithPersistenceDisabled() {
doReturn(false).when(mockPersistence).isRecoveryApplicable();
+ doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
+
+ sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
+ assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
+
+ sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+ verify(mockCohort).getRestoreFromSnapshot();
+ verifyNoMoreInteractions(mockCohort);
+
+ verify(mockPersistentProvider).deleteMessages(10L);
+ verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
+ }
+
+ static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
+ return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
+ @Override
+ public boolean matches(Object argument) {
+ UpdateElectionTerm other = (UpdateElectionTerm) argument;
+ return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendValue(new UpdateElectionTerm(term, votedFor));
+ }
+ });
+ }
+
+ @Test
+ public void testNoDataRecoveredWithPersistenceDisabled() {
+ doReturn(false).when(mockPersistence).isRecoveryApplicable();
+
sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
- assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
- assertEquals("Voted For", null, context.getTermInformation().getVotedFor());
+ assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
+ assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
sendMessageToSupport(RecoveryCompleted.getInstance(), true);
- verifyNoMoreInteractions(mockCohort);
+ verify(mockCohort).getRestoreFromSnapshot();
+ verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
+ }
+
+ @Test
+ public void testServerConfigurationPayloadApplied() {
+ String follower1 = "follower1";
+ String follower2 = "follower2";
+ String follower3 = "follower3";
+
+ context.addToPeers(follower1, null, VotingState.VOTING);
+ context.addToPeers(follower2, null, VotingState.VOTING);
+
+ //add new Server
+ ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(localId, true),
+ new ServerInfo(follower1, true),
+ new ServerInfo(follower2, false),
+ new ServerInfo(follower3, true)));
+
+ sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
+
+ //verify new peers
+ assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
+ assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
+ Sets.newHashSet(context.getPeerIds()));
+ assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
+ assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
+ assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
+
+ sendMessageToSupport(new ApplyJournalEntries(0));
+
+ verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
+ verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
+
+ //remove existing follower1
+ obj = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(localId, true),
+ new ServerInfo("follower2", true),
+ new ServerInfo("follower3", true)));
+
+ sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
+
+ //verify new peers
+ assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
+ assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
+ }
+
+ @Test
+ public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
+ doReturn(false).when(mockPersistence).isRecoveryApplicable();
+
+ String follower = "follower";
+ ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(localId, true), new ServerInfo(follower, true)));
+
+ sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
+
+ //verify new peers
+ assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
+ }
+
+ @Test
+ public void testOnSnapshotOfferWithServerConfiguration() {
+ long electionTerm = 2;
+ String electionVotedFor = "member-2";
+ ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(localId, true),
+ new ServerInfo("follower1", true),
+ new ServerInfo("follower2", true)));
+
+ Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
+ -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
+
+ SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+ SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+ sendMessageToSupport(snapshotOffer);
+
+ assertEquals("Journal log size", 0, context.getReplicatedLog().size());
+ assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+ assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+ assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
+ assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
+ Sets.newHashSet(context.getPeerIds()));
}
}