import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
InMemorySnapshotStore.clear();
}
- public static class MockRaftActor extends RaftActor {
+ public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
- protected DataPersistenceProvider dataPersistenceProvider;
- private final RaftActor delegate;
+ private final RaftActor actorDelegate;
+ private final RaftActorRecoveryCohort recoveryCohortDelegate;
+ private final RaftActorSnapshotCohort snapshotCohortDelegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
- this.delegate = mock(RaftActor.class);
+ this.actorDelegate = mock(RaftActor.class);
+ this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+ this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
if(dataPersistenceProvider == null){
- this.dataPersistenceProvider = new PersistentDataProvider();
+ setPersistence(true);
} else {
- this.dataPersistenceProvider = dataPersistenceProvider;
+ setPersistence(dataPersistenceProvider);
}
}
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
- delegate.applyState(clientActor, identifier, data);
+ actorDelegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called", persistenceId());
}
@Override
- protected void startLogRecoveryBatch(int maxBatchSize) {
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return this;
}
@Override
- protected void appendRecoveredLogEntry(Payload data) {
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return this;
+ }
+
+ @Override
+ public void startLogRecoveryBatch(int maxBatchSize) {
+ }
+
+ @Override
+ public void appendRecoveredLogEntry(Payload data) {
state.add(data);
}
@Override
- protected void applyCurrentLogRecoveryBatch() {
+ public void applyCurrentLogRecoveryBatch() {
}
@Override
protected void onRecoveryComplete() {
- delegate.onRecoveryComplete();
+ actorDelegate.onRecoveryComplete();
recoveryComplete.countDown();
}
}
@Override
- protected void applyRecoverySnapshot(byte[] bytes) {
- delegate.applyRecoverySnapshot(bytes);
+ public void applyRecoverySnapshot(byte[] bytes) {
+ recoveryCohortDelegate.applyRecoverySnapshot(bytes);
try {
Object data = toObject(bytes);
if (data instanceof List) {
}
}
- @Override protected void createSnapshot() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
LOG.info("{}: createSnapshot called", persistenceId());
- delegate.createSnapshot();
+ snapshotCohortDelegate.createSnapshot(actorRef);
}
- @Override protected void applySnapshot(byte [] snapshot) {
+ @Override
+ public void applySnapshot(byte [] snapshot) {
LOG.info("{}: applySnapshot called", persistenceId());
- delegate.applySnapshot(snapshot);
- }
-
- @Override protected void onStateChanged() {
- delegate.onStateChanged();
+ snapshotCohortDelegate.applySnapshot(snapshot);
}
@Override
- protected DataPersistenceProvider persistence() {
- return this.dataPersistenceProvider;
+ protected void onStateChanged() {
+ actorDelegate.onStateChanged();
}
@Override
public ReplicatedLog getReplicatedLog(){
return this.getRaftActorContext().getReplicatedLog();
}
-
}
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+ verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
assertEquals("remove log entries", 1, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+ mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+ verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
assertEquals("remove log entries", 0, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+ mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
- verify(mockRaftActor.delegate).createSnapshot();
+ verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
- verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+ verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
}
};
mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
- verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
+ verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
assertTrue("The replicatedLog should have changed",
oldReplicatedLog != mockRaftActor.getReplicatedLog());
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
- new NonPersistentProvider()), persistenceId);
+ new NonPersistentDataProvider()), persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
.capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
new MockRaftActorContext.MockPayload("x")), 4);
- verify(leaderActor.delegate).createSnapshot();
+ verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
assertEquals(8, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
, snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("D")), 4);
- verify(followerActor.delegate).createSnapshot();
+ verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
assertEquals(6, followerActor.getReplicatedLog().size());
assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
};
}
-
- private static class NonPersistentProvider implements DataPersistenceProvider {
- @Override
- public boolean isRecoveryApplicable() {
- return false;
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- try {
- procedure.apply(o);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void saveSnapshot(Object o) {
-
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
-
- }
- }
-
@Test
public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
new JavaTestKit(getSystem()) {{
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
Map<String, String> peerAddresses = new HashMap<>();
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
Map<String, String> peerAddresses = new HashMap<>();