import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import akka.japi.Procedure;
+
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.collect.Sets;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang3.SerializationUtils;
import org.hamcrest.Description;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Mock
private DataPersistenceProvider mockPersistence;
- @Mock
- private RaftActorBehavior mockBehavior;
@Mock
private RaftActorRecoveryCohort mockCohort;
+ @Mock
+ private RaftActorSnapshotCohort mockSnapshotCohort;
+
@Mock
PersistentDataProvider mockPersistentProvider;
public void setup() {
MockitoAnnotations.initMocks(this);
- context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
- -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+ context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
+ LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
+ mockPersistence, applyState -> { }, LOG);
- support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
+ support = new RaftActorRecoverySupport(context, mockCohort);
doReturn(true).when(mockPersistence).isRecoveryApplicable();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
}
private void sendMessageToSupport(Object message) {
@Test
public void testOnReplicatedLogEntry() {
- MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
- 1, new MockRaftActorContext.MockPayload("1", 5));
+ ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
sendMessageToSupport(logEntry);
configParams.setJournalRecoveryLogBatchSize(5);
ReplicatedLog replicatedLog = context.getReplicatedLog();
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 0, new MockRaftActorContext.MockPayload("0")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 1, new MockRaftActorContext.MockPayload("1")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 2, new MockRaftActorContext.MockPayload("2")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 3, new MockRaftActorContext.MockPayload("3")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 4, new MockRaftActorContext.MockPayload("4")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 5, new MockRaftActorContext.MockPayload("5")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
sendMessageToSupport(new ApplyJournalEntries(2));
InOrder inOrder = Mockito.inOrder(mockCohort);
inOrder.verify(mockCohort).startLogRecoveryBatch(5);
- for(int i = 0; i < replicatedLog.size() - 1; i++) {
+ for (int i = 0; i < replicatedLog.size() - 1; i++) {
inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
}
}
@Test
- public void testOnApplyLogEntries() {
+ public void testOnSnapshotOffer() {
+
ReplicatedLog replicatedLog = context.getReplicatedLog();
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 0, new MockRaftActorContext.MockPayload("0")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
- sendMessageToSupport(new ApplyLogEntries(0));
+ ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
+ new MockRaftActorContext.MockPayload("4", 4));
- assertEquals("Last applied", 0, context.getLastApplied());
- assertEquals("Commit index", 0, context.getCommitIndex());
- }
+ ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
+ new MockRaftActorContext.MockPayload("5", 5));
- @Test
- public void testOnSnapshotOffer() {
+ long lastAppliedDuringSnapshotCapture = 3;
+ long lastIndexDuringSnapshotCapture = 5;
+ long electionTerm = 2;
+ String electionVotedFor = "member-2";
- ReplicatedLog replicatedLog = context.getReplicatedLog();
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 1, new MockRaftActorContext.MockPayload("1")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 2, new MockRaftActorContext.MockPayload("2")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 3, new MockRaftActorContext.MockPayload("3")));
+ MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
+ Snapshot snapshot = Snapshot.create(snapshotState,
+ Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
+ lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
+
+ SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+ SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+ sendMessageToSupport(snapshotOffer);
+
+ assertEquals("Journal log size", 2, context.getReplicatedLog().size());
+ assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
+ assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
+ assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
+ assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
+ assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+ assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+ assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
- byte[] snapshotBytes = {1,2,3,4,5};
+ verify(mockCohort).applyRecoverySnapshot(snapshotState);
+ }
- ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
- 4, new MockRaftActorContext.MockPayload("4", 4));
+ @Deprecated
+ @Test
+ public void testOnSnapshotOfferWithPreCarbonSnapshot() {
- ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
- 5, new MockRaftActorContext.MockPayload("5", 5));
+ ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
+ new MockRaftActorContext.MockPayload("4", 4));
+
+ ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
+ new MockRaftActorContext.MockPayload("5", 5));
long lastAppliedDuringSnapshotCapture = 3;
long lastIndexDuringSnapshotCapture = 5;
long electionTerm = 2;
String electionVotedFor = "member-2";
- Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
- lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
+ List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
+ final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
+
+ org.opendaylight.controller.cluster.raft.Snapshot snapshot = org.opendaylight.controller.cluster.raft.Snapshot
+ .create(SerializationUtils.serialize((Serializable) snapshotData),
+ Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
+ lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+ doAnswer(invocation -> new MockSnapshotState(SerializationUtils.deserialize(
+ invocation.getArgumentAt(0, byte[].class))))
+ .when(mockCohort).deserializePreCarbonSnapshot(any(byte[].class));
+
sendMessageToSupport(snapshotOffer);
assertEquals("Journal log size", 2, context.getReplicatedLog().size());
assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
- verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
+ verify(mockCohort).applyRecoverySnapshot(snapshotState);
}
@Test
public void testOnRecoveryCompletedWithRemainingBatch() {
ReplicatedLog replicatedLog = context.getReplicatedLog();
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 0, new MockRaftActorContext.MockPayload("0")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 1, new MockRaftActorContext.MockPayload("1")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
sendMessageToSupport(new ApplyJournalEntries(1));
InOrder inOrder = Mockito.inOrder(mockCohort);
inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
- for(int i = 0; i < replicatedLog.size(); i++) {
+ for (int i = 0; i < replicatedLog.size(); i++) {
inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
}
verifyNoMoreInteractions(mockCohort);
}
- @Test
- public void testOnDeprecatedDeleteEntries() {
- ReplicatedLog replicatedLog = context.getReplicatedLog();
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 0, new MockRaftActorContext.MockPayload("0")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 1, new MockRaftActorContext.MockPayload("1")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 2, new MockRaftActorContext.MockPayload("2")));
-
- sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
-
- assertEquals("Journal log size", 1, context.getReplicatedLog().size());
- assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
- }
-
@Test
public void testOnDeleteEntries() {
ReplicatedLog replicatedLog = context.getReplicatedLog();
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 0, new MockRaftActorContext.MockPayload("0")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 1, new MockRaftActorContext.MockPayload("1")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 2, new MockRaftActorContext.MockPayload("2")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
sendMessageToSupport(new DeleteEntries(1));
assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
}
- @Test
- public void testDeprecatedUpdateElectionTerm() {
-
- sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
-
- assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
- assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
- }
-
- @SuppressWarnings("unchecked")
@Test
public void testDataRecoveredWithPersistenceDisabled() {
+ doNothing().when(mockCohort).applyRecoverySnapshot(anyObject());
doReturn(false).when(mockPersistence).isRecoveryApplicable();
doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
- sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
-
- Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+ Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
+ Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
sendMessageToSupport(snapshotOffer);
- sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 4, new MockRaftActorContext.MockPayload("4")));
- sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 5, new MockRaftActorContext.MockPayload("5")));
+ sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
+ sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
+ sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
sendMessageToSupport(new ApplyJournalEntries(4));
sendMessageToSupport(new DeleteEntries(5));
- sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
-
assertEquals("Journal log size", 0, context.getReplicatedLog().size());
assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
assertEquals("Last applied", -1, context.getLastApplied());
sendMessageToSupport(RecoveryCompleted.getInstance(), true);
- verify(mockCohort).getRestoreFromSnapshot();
+ verify(mockCohort, never()).applyRecoverySnapshot(anyObject());
+ verify(mockCohort, never()).getRestoreFromSnapshot();
verifyNoMoreInteractions(mockCohort);
verify(mockPersistentProvider).deleteMessages(10L);
- verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
- verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
}
static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
new ServerInfo(follower2, false),
new ServerInfo(follower3, true)));
- sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
+ sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
//verify new peers
assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
new ServerInfo("follower2", true),
new ServerInfo("follower3", true)));
- sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
+ sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
//verify new peers
assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(localId, true), new ServerInfo(follower, true)));
- sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
+ sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
//verify new peers
assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
new ServerInfo("follower1", true),
new ServerInfo("follower2", true)));
- Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
+ MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
+ Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
-1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);