package org.opendaylight.controller.cluster.raft;
import akka.actor.ActorRef;
-import akka.japi.Procedure;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.annotations.VisibleForTesting;
private final RaftActorSnapshotCohort cohort;
private final Logger log;
- private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
- @Override
- public void apply(Void notUsed) {
- cohort.createSnapshot(context.getActor());
- }
- };
-
- private final Procedure<byte[]> applySnapshotProcedure = new Procedure<byte[]>() {
- @Override
- public void apply(byte[] state) {
- cohort.applySnapshot(state);
- }
- };
-
private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS);
RaftActorSnapshotMessageSupport(final RaftActorContext context, final RaftActorSnapshotCohort cohort) {
this.cohort = cohort;
this.log = context.getLogger();
- context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
- context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure);
+ context.getSnapshotManager().setCreateSnapshotRunnable(() -> cohort.createSnapshot(context.getActor()));
+ context.getSnapshotManager().setApplySnapshotConsumer(cohort::applySnapshot);
}
boolean handleSnapshotMessage(Object message, ActorRef sender) {
package org.opendaylight.controller.cluster.raft;
-import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
private CaptureSnapshot captureSnapshot;
private long lastSequenceNumber = -1;
- private Procedure<Void> createSnapshotProcedure;
+ private Runnable createSnapshotProcedure;
private ApplySnapshot applySnapshot;
- private Procedure<byte[]> applySnapshotProcedure;
+ private Consumer<byte[]> applySnapshotProcedure;
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
return currentState.trimLog(desiredTrimIndex);
}
- public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+ public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
this.createSnapshotProcedure = createSnapshotProcedure;
}
- public void setApplySnapshotProcedure(Procedure<byte[]> applySnapshotProcedure) {
+ public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
this.applySnapshotProcedure = applySnapshotProcedure;
}
SnapshotManager.this.currentState = CREATING;
try {
- createSnapshotProcedure.apply(null);
+ createSnapshotProcedure.run();
} catch (Exception e) {
SnapshotManager.this.currentState = IDLE;
LOG.error("Error creating snapshot", e);
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
if(snapshot.getState().length > 0 ) {
- applySnapshotProcedure.apply(snapshot.getState());
+ applySnapshotProcedure.accept(snapshot.getState());
}
applySnapshot.getCallback().onSuccess();
@Override
public SnapshotManager getSnapshotManager() {
SnapshotManager snapshotManager = super.getSnapshotManager();
- snapshotManager.setCreateSnapshotCallable(NoopProcedure.<Void>instance());
+ snapshotManager.setCreateSnapshotRunnable(() -> { });
return snapshotManager;
}
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
-import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
import java.util.Arrays;
private RaftActorBehavior mockRaftActorBehavior;
@Mock
- private Procedure<Void> mockProcedure;
+ private Runnable mockProcedure;
@Mock
private ElectionTerm mockElectionTerm;
actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
doReturn(actorRef).when(mockRaftActorContext).getActor();
- snapshotManager.setCreateSnapshotCallable(mockProcedure);
+ snapshotManager.setCreateSnapshotRunnable(mockProcedure);
}
@After
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
@Test
public void testCaptureWithCreateProcedureError () throws Exception {
- doThrow(new Exception("mock")).when(mockProcedure).apply(null);
+ doThrow(new RuntimeException("mock")).when(mockProcedure).run();
boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
assertEquals(false, snapshotManager.isCapturing());
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
}
@Test
assertTrue(capture);
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
reset(mockProcedure);
assertFalse(capture);
- verify(mockProcedure, never()).apply(null);
+ verify(mockProcedure, never()).run();
}
@Test