Reset snapshot progress after timeout has been hit
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActorContext.java
index cdcaadf8151999f8df9a5a9961285b36944daaa6..8c17e1e8e8e9d4060221f8c68b2a980f3f6b1b32 100644 (file)
@@ -12,14 +12,21 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import akka.japi.Procedure;
-import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
@@ -30,6 +37,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
 
     private ActorSystem system;
     private RaftPolicy raftPolicy;
+    private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
 
     private static ElectionTerm newElectionTerm() {
         return new ElectionTerm() {
@@ -47,29 +55,34 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             }
 
             @Override
-            public void update(long newTerm, String newVotedFor) {
+            public void update(final long newTerm, final String newVotedFor) {
                 this.currentTerm = newTerm;
                 this.votedFor = newVotedFor;
 
                 // TODO : Write to some persistent state
             }
 
-            @Override public void updateAndPersist(long newTerm, String newVotedFor) {
+            @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
                 update(newTerm, newVotedFor);
             }
         };
     }
 
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
+                MoreExecutors.directExecutor());
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
-    public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
+    public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
-            new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
-            applyState -> actor.tell(applyState, actor), LOG);
+            new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
+            MoreExecutors.directExecutor());
 
         this.system = system;
 
@@ -87,11 +100,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         setLastApplied(replicatedLog.lastIndex());
     }
 
-    @Override public ActorRef actorOf(Props props) {
+    @Override public ActorRef actorOf(final Props props) {
         return system.actorOf(props);
     }
 
-    @Override public ActorSelection actorSelection(String path) {
+    @Override public ActorSelection actorSelection(final String path) {
         return system.actorSelection(path);
     }
 
@@ -99,7 +112,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         return this.system;
     }
 
-    @Override public ActorSelection getPeerActorSelection(String peerId) {
+    @Override public ActorSelection getPeerActorSelection(final String peerId) {
         String peerAddress = getPeerAddress(peerId);
         if (peerAddress != null) {
             return actorSelection(peerAddress);
@@ -107,7 +120,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         return null;
     }
 
-    public void setPeerAddresses(Map<String, String> peerAddresses) {
+    public void setPeerAddresses(final Map<String, String> peerAddresses) {
         for (String id: getPeerIds()) {
             removePeer(id);
         }
@@ -120,16 +133,36 @@ public class MockRaftActorContext extends RaftActorContextImpl {
     @Override
     public SnapshotManager getSnapshotManager() {
         SnapshotManager snapshotManager = super.getSnapshotManager();
-        snapshotManager.setCreateSnapshotRunnable(() -> { });
+        snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
+
+        snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
+            @Override
+            public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
+                return ByteState.of(snapshotBytes.read());
+            }
+
+            @Override
+            public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
+            }
+
+            @Override
+            public void applySnapshot(final State snapshotState) {
+            }
+        });
+
         return snapshotManager;
     }
 
+    public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+        this.createSnapshotProcedure = createSnapshotProcedure;
+    }
+
     @Override
     public RaftPolicy getRaftPolicy() {
         return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
     }
 
-    public void setRaftPolicy(RaftPolicy raftPolicy) {
+    public void setRaftPolicy(final RaftPolicy raftPolicy) {
         this.raftPolicy = raftPolicy;
     }
 
@@ -140,31 +173,27 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         }
 
         @Override
-        public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+        public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
         }
 
         @Override
-        public boolean shouldCaptureSnapshot(long logIndex) {
+        public boolean shouldCaptureSnapshot(final long logIndex) {
             return false;
         }
 
         @Override
-        public boolean removeFromAndPersist(long index) {
+        public boolean removeFromAndPersist(final long index) {
             return removeFrom(index) >= 0;
         }
 
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
-        public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
-                boolean doAsync) {
+        public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+                final Consumer<ReplicatedLogEntry> callback, final boolean doAsync) {
             append(replicatedLogEntry);
 
             if (callback != null) {
-                try {
-                    callback.apply(replicatedLogEntry);
-                } catch (Exception e) {
-                    Throwables.propagate(e);
-                }
+                callback.accept(replicatedLogEntry);
             }
 
             return true;
@@ -179,12 +208,12 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         public MockPayload() {
         }
 
-        public MockPayload(String data) {
+        public MockPayload(final String data) {
             this.value = data;
             size = value.length();
         }
 
-        public MockPayload(String data, int size) {
+        public MockPayload(final String data, final int size) {
             this(data);
             this.size = size;
         }
@@ -208,7 +237,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         }
 
         @Override
-        public boolean equals(Object obj) {
+        public boolean equals(final Object obj) {
             if (this == obj) {
                 return true;
             }
@@ -233,7 +262,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
     public static class MockReplicatedLogBuilder {
         private final ReplicatedLog mockLog = new SimpleReplicatedLog();
 
-        public  MockReplicatedLogBuilder createEntries(int start, int end, int term) {
+        public  MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
             for (int i = start; i < end; i++) {
                 this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
                         new MockRaftActorContext.MockPayload(Integer.toString(i))));
@@ -241,7 +270,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             return this;
         }
 
-        public  MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
+        public  MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
             this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
             return this;
         }