(configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
delegatingPersistenceProvider, LOG);
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
}
@Override
}
protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
- return new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
- getRaftActorRecoveryCohort());
+ return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort());
}
protected void initializeBehavior(){
}
protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
- return new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
- currentBehavior, getRaftActorSnapshotCohort());
+ return new RaftActorSnapshotMessageSupport(context, currentBehavior,
+ getRaftActorSnapshotCohort());
}
private void onGetOnDemandRaftStats() {
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
* @author Thomas Pantelis
*/
class RaftActorRecoverySupport {
- private final DataPersistenceProvider persistence;
private final RaftActorContext context;
private final RaftActorBehavior currentBehavior;
private final RaftActorRecoveryCohort cohort;
private Stopwatch recoveryTimer;
private final Logger log;
- RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
- RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
- this.persistence = persistence;
+ RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior,
+ RaftActorRecoveryCohort cohort) {
this.context = context;
this.currentBehavior = currentBehavior;
this.cohort = cohort;
boolean handleRecoveryMessage(Object message) {
boolean recoveryComplete = false;
- if(persistence.isRecoveryApplicable()) {
+ if(context.getPersistenceProvider().isRecoveryApplicable()) {
if (message instanceof SnapshotOffer) {
onRecoveredSnapshot((SnapshotOffer) message);
} else if (message instanceof ReplicatedLogEntry) {
// The replicated log can be used later on to retrieve this snapshot
// when we need to install it on a peer
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
context.setLastApplied(snapshot.getLastAppliedIndex());
context.setCommitIndex(snapshot.getLastAppliedIndex());
import akka.japi.Procedure;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
class RaftActorSnapshotMessageSupport {
static final String COMMIT_SNAPSHOT = "commit_snapshot";
- private final DataPersistenceProvider persistence;
private final RaftActorContext context;
private final RaftActorBehavior currentBehavior;
private final RaftActorSnapshotCohort cohort;
}
};
- RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
- RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
- this.persistence = persistence;
+ RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
+ RaftActorSnapshotCohort cohort) {
this.context = context;
this.currentBehavior = currentBehavior;
this.cohort = cohort;
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
return true;
} else if (message.equals(COMMIT_SNAPSHOT)) {
- context.getSnapshotManager().commit(persistence, -1);
+ context.getSnapshotManager().commit(-1);
return true;
} else {
return false;
private void onCaptureSnapshotReply(byte[] snapshotBytes) {
log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
- context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
+ context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory());
}
private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
long sequenceNumber = success.metadata().sequenceNr();
- context.getSnapshotManager().commit(persistence, sequenceNumber);
+ context.getSnapshotManager().commit(sequenceNumber);
}
private void onApplySnapshot(Snapshot snapshot) {
cohort.applySnapshot(snapshot.getState());
//clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
- currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
context.setLastApplied(snapshot.getLastAppliedIndex());
}
}
import akka.japi.Procedure;
import java.util.Collections;
import java.util.List;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
private long dataSizeSinceLastSnapshot = 0L;
private final RaftActorContext context;
- private final DataPersistenceProvider persistence;
private final RaftActorBehavior currentBehavior;
private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
};
static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context,
- DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ RaftActorBehavior currentBehavior) {
return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
- snapshot.getUnAppliedEntries(), context, persistence, currentBehavior);
+ snapshot.getUnAppliedEntries(), context, currentBehavior);
}
- static ReplicatedLog newInstance(RaftActorContext context,
- DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ static ReplicatedLog newInstance(RaftActorContext context, RaftActorBehavior currentBehavior) {
return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
- persistence, currentBehavior);
+ currentBehavior);
}
private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries,
- RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ RaftActorContext context, RaftActorBehavior currentBehavior) {
super(snapshotIndex, snapshotTerm, unAppliedEntries);
this.context = context;
- this.persistence = persistence;
this.currentBehavior = currentBehavior;
}
// FIXME: Maybe this should be done after the command is saved
long adjustedIndex = removeFrom(logEntryIndex);
if(adjustedIndex >= 0) {
- persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+ context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
}
}
// persist call and the execution(s) of the associated event
// handler. This also holds for multiple persist calls in context
// of a single command.
- persistence.persist(replicatedLogEntry,
+ context.getPersistenceProvider().persist(replicatedLogEntry,
new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry evt) throws Exception {
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.util.List;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
}
@Override
- public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
- RaftActorBehavior currentBehavior, long totalMemory) {
- currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
+ public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+ currentState.persist(snapshotBytes, currentBehavior, totalMemory);
}
@Override
- public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
- currentState.commit(persistenceProvider, sequenceNumber);
+ public void commit(long sequenceNumber) {
+ currentState.commit(sequenceNumber);
}
@Override
}
@Override
- public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
- RaftActorBehavior currentBehavior, long totalMemory) {
+ public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
LOG.debug("persist should not be called in state {}", this);
}
@Override
- public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ public void commit(long sequenceNumber) {
LOG.debug("commit should not be called in state {}", this);
}
}
@Override
- public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
- RaftActorBehavior currentBehavior, long totalMemory) {
+ public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
- persistenceProvider.saveSnapshot(sn);
+ context.getPersistenceProvider().saveSnapshot(sn);
LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
private class Persisting extends AbstractSnapshotState {
@Override
- public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ public void commit(long sequenceNumber) {
context.getReplicatedLog().snapshotCommit();
- persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+ context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
- persistenceProvider.deleteMessages(lastSequenceNumber);
+ context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
lastSequenceNumber = -1;
SnapshotManager.this.currentState = IDLE;
package org.opendaylight.controller.cluster.raft;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
public interface SnapshotState {
/**
* Persist the snapshot
*
- * @param persistenceProvider
* @param snapshotBytes
* @param currentBehavior
* @param totalMemory
*/
- void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
- ,long totalMemory);
+ void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory);
/**
* Commit the snapshot by trimming the log
*
- * @param persistenceProvider
* @param sequenceNumber
*/
- void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+ void commit(long sequenceNumber);
/**
* Rollback the snapshot
context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG),
-1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
- support = new RaftActorRecoverySupport(mockPersistence, context , mockBehavior, mockCohort);
+ support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
doReturn(true).when(mockPersistence).isRecoveryApplicable();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
}
private void sendMessageToSupport(Object message) {
}
};
- support = new RaftActorSnapshotMessageSupport(mockPersistence, context, mockBehavior, mockCohort);
+ support = new RaftActorSnapshotMessageSupport(context, mockBehavior, mockCohort);
doReturn(true).when(mockPersistence).isRecoveryApplicable();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
}
private void sendMessageToSupport(Object message) {
byte[] snapshot = {1,2,3,4,5};
sendMessageToSupport(new CaptureSnapshotReply(snapshot));
- verify(mockSnapshotManager).persist(same(mockPersistence), same(snapshot), same(mockBehavior), anyLong());
+ verify(mockSnapshotManager).persist(same(snapshot), same(mockBehavior), anyLong());
}
@Test
long sequenceNumber = 100;
sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
- verify(mockSnapshotManager).commit(mockPersistence, sequenceNumber);
+ verify(mockSnapshotManager).commit(sequenceNumber);
}
@Test
sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
- verify(mockSnapshotManager).commit(mockPersistence, -1);
+ verify(mockSnapshotManager).commit(-1);
}
@Test
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
- , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
+ leader, Runtime.getRuntime().totalMemory());
assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
@SuppressWarnings("unchecked")
@Test
public void testAppendAndPersistExpectingNoCapture() throws Exception {
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
}
});
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
int dataSize = 600;
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
@Test
public void testRemoveFromAndPersist() throws Exception {
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0")));
log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
snapshotManager.capture(lastLogEntry, -1);
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
new MockRaftActorContext.MockPayload()), 9);
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
@Test
public void testCallingPersistWithoutCaptureWillDoNothing(){
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, never()).snapshotCommit();
@Test
public void testCommitBeforeCapture(){
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, never()).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, times(1)).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
snapshotManager.rollback();