Bug 5740: Change TimeoutNow and Shutdown to externalizable proxy
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
index 550504b4006161713ae8d5e166ac9bf169c2417e..ae26383283e00b12b3a4c72eb7c955519f61f937 100644 (file)
@@ -10,13 +10,16 @@ package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
+
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -24,12 +27,14 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
-
     public static final short PAYLOAD_VERSION = 5;
 
     final RaftActor actorDelegate;
@@ -41,17 +46,21 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
     private RaftActorRecoverySupport raftActorRecoverySupport;
     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
-    private final byte[] restoreFromSnapshot;
+    private final Snapshot restoreFromSnapshot;
     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
+    private final Function<Runnable, Void> pauseLeaderFunction;
 
     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
-        super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
+        super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
+            Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
         state = new ArrayList<>();
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
-        this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
 
-        if(builder.dataPersistenceProvider == null){
+        this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
+            mock(RaftActorSnapshotCohort.class);
+
+        if (builder.dataPersistenceProvider == null) {
             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
         } else {
             setPersistence(builder.dataPersistenceProvider);
@@ -60,6 +69,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         roleChangeNotifier = builder.roleChangeNotifier;
         snapshotMessageSupport = builder.snapshotMessageSupport;
         restoreFromSnapshot = builder.restoreFromSnapshot;
+        pauseLeaderFunction = builder.pauseLeaderFunction;
     }
 
     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
@@ -77,6 +87,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
     }
 
+    @Override
+    public RaftActorContext getRaftActorContext() {
+        return super.getRaftActorContext();
+    }
+
     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
         return snapshotMessageSupport;
     }
@@ -85,7 +100,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         try {
             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            Throwables.propagate(e);
         }
     }
 
@@ -93,14 +108,14 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         try {
             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            Throwables.propagate(e);
         }
     }
 
 
-    public void waitUntilLeader(){
-        for(int i = 0;i < 10; i++){
-            if(isLeader()){
+    public void waitUntilLeader() {
+        for (int i = 0; i < 10; i++) {
+            if (isLeader()) {
                 break;
             }
             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -111,8 +126,8 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return state;
     }
 
-
-    @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+    @Override
+    protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
         actorDelegate.applyState(clientActor, identifier, data);
         LOG.info("{}: applyState called: {}", persistenceId(), data);
 
@@ -156,33 +171,43 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void applyRecoverySnapshot(byte[] bytes) {
-        recoveryCohortDelegate.applyRecoverySnapshot(bytes);
-        applySnapshotBytes(bytes);
+    public void applyRecoverySnapshot(Snapshot.State newState) {
+        recoveryCohortDelegate.applyRecoverySnapshot(newState);
+        applySnapshotState(newState);
     }
 
-    private void applySnapshotBytes(byte[] bytes) {
-        try {
-            Object data = toObject(bytes);
-            if (data instanceof List) {
-                state.addAll((List<?>) data);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
+    private void applySnapshotState(Snapshot.State newState) {
+        if (newState instanceof MockSnapshotState) {
+            state.clear();
+            state.addAll(((MockSnapshotState)newState).getState());
         }
     }
 
     @Override
-    public void createSnapshot(ActorRef actorRef) {
+    public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
         LOG.info("{}: createSnapshot called", persistenceId());
-        snapshotCohortDelegate.createSnapshot(actorRef);
+        snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
     }
 
     @Override
-    public void applySnapshot(byte [] snapshot) {
+    public void applySnapshot(Snapshot.State newState) {
         LOG.info("{}: applySnapshot called", persistenceId());
-        applySnapshotBytes(snapshot);
-        snapshotCohortDelegate.applySnapshot(snapshot);
+        applySnapshotState(newState);
+        snapshotCohortDelegate.applySnapshot(newState);
+    }
+
+    @Override
+    public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+        try {
+            return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
+        } catch (IOException e) {
+            throw new RuntimeException("Error deserializing state", e);
+        }
+    }
+
+    @Override
+    public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
+        return new MockSnapshotState(SerializationUtils.deserialize(from));
     }
 
     @Override
@@ -204,55 +229,52 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void handleCommand(final Object message) {
-        if(message instanceof RaftActorBehavior) {
+    protected void handleCommand(final Object message) {
+        if (message instanceof RaftActorBehavior) {
             super.changeCurrentBehavior((RaftActorBehavior)message);
         } else {
             super.handleCommand(message);
 
-            if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
+            if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
                 snapshotCommitted.countDown();
             }
         }
     }
 
-    public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
-        Object obj = null;
-        ByteArrayInputStream bis = null;
-        ObjectInputStream ois = null;
-        try {
-            bis = new ByteArrayInputStream(bs);
-            ois = new ObjectInputStream(bis);
-            obj = ois.readObject();
-        } finally {
-            if (bis != null) {
-                bis.close();
-            }
-            if (ois != null) {
-                ois.close();
-            }
+    @Override
+    protected void pauseLeader(Runnable operation) {
+        if (pauseLeaderFunction != null) {
+            pauseLeaderFunction.apply(operation);
+        } else {
+            super.pauseLeader(operation);
         }
-        return obj;
     }
 
-    public ReplicatedLog getReplicatedLog(){
+    public static List<Object> fromState(Snapshot.State from) {
+        if (from instanceof MockSnapshotState) {
+            return ((MockSnapshotState)from).getState();
+        }
+
+        throw new IllegalStateException("Unexpected snapshot State: " + from);
+    }
+
+    public ReplicatedLog getReplicatedLog() {
         return this.getRaftActorContext().getReplicatedLog();
     }
 
     @Override
-    public byte[] getRestoreFromSnapshot() {
+    public Snapshot getRestoreFromSnapshot() {
         return restoreFromSnapshot;
     }
 
-    public static Props props(final String id, final Map<String, String> peerAddresses,
-            ConfigParams config){
+    public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
     }
 
     public static Props props(final String id, final Map<String, String> peerAddresses,
-                              ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
-        return builder().id(id).peerAddresses(peerAddresses).config(config).
-                dataPersistenceProvider(dataPersistenceProvider).props();
+                              ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
+        return builder().id(id).peerAddresses(peerAddresses).config(config)
+                .dataPersistenceProvider(dataPersistenceProvider).props();
     }
 
     public static Builder builder() {
@@ -266,9 +288,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         private DataPersistenceProvider dataPersistenceProvider;
         private ActorRef roleChangeNotifier;
         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
-        private byte[] restoreFromSnapshot;
+        private Snapshot restoreFromSnapshot;
         private Optional<Boolean> persistent = Optional.absent();
         private final Class<A> actorClass;
+        private Function<Runnable, Void> pauseLeaderFunction;
+        private RaftActorSnapshotCohort snapshotCohort;
 
         protected AbstractBuilder(Class<A> actorClass) {
             this.actorClass = actorClass;
@@ -279,43 +303,53 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             return (T) this;
         }
 
-        public T id(String id) {
-            this.id = id;
+        public T id(String newId) {
+            this.id = newId;
             return self();
         }
 
-        public T peerAddresses(Map<String, String> peerAddresses) {
-            this.peerAddresses = peerAddresses;
+        public T peerAddresses(Map<String, String> newPeerAddresses) {
+            this.peerAddresses = newPeerAddresses;
             return self();
         }
 
-        public T config(ConfigParams config) {
-            this.config = config;
+        public T config(ConfigParams newConfig) {
+            this.config = newConfig;
             return self();
         }
 
-        public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
-            this.dataPersistenceProvider = dataPersistenceProvider;
+        public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
+            this.dataPersistenceProvider = newDataPersistenceProvider;
             return self();
         }
 
-        public T roleChangeNotifier(ActorRef roleChangeNotifier) {
-            this.roleChangeNotifier = roleChangeNotifier;
+        public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
+            this.roleChangeNotifier = newRoleChangeNotifier;
             return self();
         }
 
-        public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
-            this.snapshotMessageSupport = snapshotMessageSupport;
+        public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
+            this.snapshotMessageSupport = newSnapshotMessageSupport;
             return self();
         }
 
-        public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
-            this.restoreFromSnapshot = restoreFromSnapshot;
+        public T restoreFromSnapshot(Snapshot newRestoreFromSnapshot) {
+            this.restoreFromSnapshot = newRestoreFromSnapshot;
             return self();
         }
 
-        public T persistent(Optional<Boolean> persistent) {
-            this.persistent = persistent;
+        public T persistent(Optional<Boolean> newPersistent) {
+            this.persistent = newPersistent;
+            return self();
+        }
+
+        public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
+            this.pauseLeaderFunction = newPauseLeaderFunction;
+            return self();
+        }
+
+        public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
+            this.snapshotCohort = newSnapshotCohort;
             return self();
         }
 
@@ -329,4 +363,53 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             super(MockRaftActor.class);
         }
     }
+
+    public static class MockSnapshotState implements Snapshot.State {
+        private static final long serialVersionUID = 1L;
+
+        private final List<Object> state;
+
+        public MockSnapshotState(List<Object> state) {
+            this.state = state;
+        }
+
+        public List<Object> getState() {
+            return state;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (state == null ? 0 : state.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            MockSnapshotState other = (MockSnapshotState) obj;
+            if (state == null) {
+                if (other.state != null) {
+                    return false;
+                }
+            } else if (!state.equals(other.state)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "MockSnapshotState [state=" + state + "]";
+        }
+    }
 }