Mechanical code cleanup (sal-akka-raft)
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 7109980f3d5f33ca735853544b42ad718985e99b..3fc43c7fd1457c126b4cf4cf40285844401341e9 100644 (file)
@@ -8,10 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -37,10 +37,10 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Procedure<Void> createSnapshotProcedure;
+    private Runnable createSnapshotProcedure;
 
     private ApplySnapshot applySnapshot;
-    private Procedure<byte[]> applySnapshotProcedure;
+    private Consumer<byte[]> applySnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -77,8 +77,8 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void commit(final long sequenceNumber) {
-        currentState.commit(sequenceNumber);
+    public void commit(final long sequenceNumber, long timeStamp) {
+        currentState.commit(sequenceNumber, timeStamp);
     }
 
     @Override
@@ -91,11 +91,11 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotProcedure(Procedure<byte[]> applySnapshotProcedure) {
+    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
         this.applySnapshotProcedure = applySnapshotProcedure;
     }
 
@@ -177,7 +177,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void commit(final long sequenceNumber) {
+        public void commit(final long sequenceNumber, long timeStamp) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -250,7 +250,7 @@ public class SnapshotManager implements SnapshotState {
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.apply(null);
+                createSnapshotProcedure.run();
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 LOG.error("Error creating snapshot", e);
@@ -385,7 +385,7 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(final long sequenceNumber) {
+        public void commit(final long sequenceNumber, long timeStamp) {
             LOG.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
             if(applySnapshot != null) {
@@ -398,8 +398,12 @@ public class SnapshotManager implements SnapshotState {
                     context.setCommitIndex(snapshot.getLastAppliedIndex());
                     context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
+                    if(snapshot.getServerConfiguration() != null) {
+                        context.updatePeerIds(snapshot.getServerConfiguration());
+                    }
+
                     if(snapshot.getState().length > 0 ) {
-                        applySnapshotProcedure.apply(snapshot.getState());
+                        applySnapshotProcedure.accept(snapshot.getState());
                     }
 
                     applySnapshot.getCallback().onSuccess();
@@ -410,8 +414,8 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotCommit();
             }
 
-            context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
-                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE, 0L, 0L));
+            context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(sequenceNumber,
+                    timeStamp - 1, 0L, 0L));
 
             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
@@ -451,7 +455,7 @@ public class SnapshotManager implements SnapshotState {
 
     }
 
-    private static interface TermInformationReader {
+    private interface TermInformationReader {
         long getIndex();
         long getTerm();
     }