X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FMockRaftActor.java;h=20adaf12beff56f0125441c265d75c2533b3c769;hb=5fd4213b5bfaf2db21f1b37139f6b98535a872c0;hp=3ba664be7da313e3294fbc7c97988bf6c82018d6;hpb=bef65394c7f540b601ce4bd360d7d7648f289bd1;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 3ba664be7d..20adaf12be 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -10,14 +10,16 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; + import akka.actor.ActorRef; import akka.actor.Props; import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.ObjectInputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -25,8 +27,10 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; @@ -52,9 +56,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, state = new ArrayList<>(); this.actorDelegate = mock(RaftActor.class); this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); - this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); - if(builder.dataPersistenceProvider == null){ + this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort : + mock(RaftActorSnapshotCohort.class); + + if (builder.dataPersistenceProvider == null) { setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true); } else { setPersistence(builder.dataPersistenceProvider); @@ -81,6 +87,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport()); } + @Override + public RaftActorContext getRaftActorContext() { + return super.getRaftActorContext(); + } + public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() { return snapshotMessageSupport; } @@ -89,7 +100,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, try { assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); } catch (InterruptedException e) { - e.printStackTrace(); + Throwables.propagate(e); } } @@ -97,14 +108,14 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, try { assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS)); } catch (InterruptedException e) { - e.printStackTrace(); + Throwables.propagate(e); } } - public void waitUntilLeader(){ - for(int i = 0;i < 10; i++){ - if(isLeader()){ + public void waitUntilLeader() { + for (int i = 0; i < 10; i++) { + if (isLeader()) { break; } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -160,34 +171,43 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(byte[] bytes) { - recoveryCohortDelegate.applyRecoverySnapshot(bytes); - applySnapshotBytes(bytes); + public void applyRecoverySnapshot(Snapshot.State newState) { + recoveryCohortDelegate.applyRecoverySnapshot(newState); + applySnapshotState(newState); } - private void applySnapshotBytes(byte[] bytes) { - try { - Object data = toObject(bytes); - if (data instanceof List) { - state.clear(); - state.addAll((List) data); - } - } catch (Exception e) { - e.printStackTrace(); + private void applySnapshotState(Snapshot.State newState) { + if (newState instanceof MockSnapshotState) { + state.clear(); + state.addAll(((MockSnapshotState)newState).getState()); } } @Override - public void createSnapshot(ActorRef actorRef) { + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { LOG.info("{}: createSnapshot called", persistenceId()); - snapshotCohortDelegate.createSnapshot(actorRef); + snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream); } @Override - public void applySnapshot(byte [] snapshot) { + public void applySnapshot(Snapshot.State newState) { LOG.info("{}: applySnapshot called", persistenceId()); - applySnapshotBytes(snapshot); - snapshotCohortDelegate.applySnapshot(snapshot); + applySnapshotState(newState); + snapshotCohortDelegate.applySnapshot(newState); + } + + @Override + public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) { + try { + return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read()); + } catch (IOException e) { + throw new RuntimeException("Error deserializing state", e); + } + } + + @Override + public Snapshot.State deserializePreCarbonSnapshot(byte[] from) { + return new MockSnapshotState(SerializationUtils.deserialize(from)); } @Override @@ -210,12 +230,12 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override protected void handleCommand(final Object message) { - if(message instanceof RaftActorBehavior) { + if (message instanceof RaftActorBehavior) { super.changeCurrentBehavior((RaftActorBehavior)message); } else { super.handleCommand(message); - if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) { + if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) { snapshotCommitted.countDown(); } } @@ -223,33 +243,22 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override protected void pauseLeader(Runnable operation) { - if(pauseLeaderFunction != null) { + if (pauseLeaderFunction != null) { pauseLeaderFunction.apply(operation); } else { super.pauseLeader(operation); } } - public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } + public static List fromState(Snapshot.State from) { + if (from instanceof MockSnapshotState) { + return ((MockSnapshotState)from).getState(); } - return obj; + + throw new IllegalStateException("Unexpected snapshot State: " + from); } - public ReplicatedLog getReplicatedLog(){ + public ReplicatedLog getReplicatedLog() { return this.getRaftActorContext().getReplicatedLog(); } @@ -258,15 +267,14 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return restoreFromSnapshot; } - public static Props props(final String id, final Map peerAddresses, - ConfigParams config){ + public static Props props(final String id, final Map peerAddresses, ConfigParams config) { return builder().id(id).peerAddresses(peerAddresses).config(config).props(); } public static Props props(final String id, final Map peerAddresses, - ConfigParams config, DataPersistenceProvider dataPersistenceProvider){ - return builder().id(id).peerAddresses(peerAddresses).config(config). - dataPersistenceProvider(dataPersistenceProvider).props(); + ConfigParams config, DataPersistenceProvider dataPersistenceProvider) { + return builder().id(id).peerAddresses(peerAddresses).config(config) + .dataPersistenceProvider(dataPersistenceProvider).props(); } public static Builder builder() { @@ -284,6 +292,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, private Optional persistent = Optional.absent(); private final Class actorClass; private Function pauseLeaderFunction; + private RaftActorSnapshotCohort snapshotCohort; protected AbstractBuilder(Class actorClass) { this.actorClass = actorClass; @@ -294,48 +303,53 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return (T) this; } - public T id(String id) { - this.id = id; + public T id(String newId) { + this.id = newId; return self(); } - public T peerAddresses(Map peerAddresses) { - this.peerAddresses = peerAddresses; + public T peerAddresses(Map newPeerAddresses) { + this.peerAddresses = newPeerAddresses; return self(); } - public T config(ConfigParams config) { - this.config = config; + public T config(ConfigParams newConfig) { + this.config = newConfig; return self(); } - public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) { - this.dataPersistenceProvider = dataPersistenceProvider; + public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) { + this.dataPersistenceProvider = newDataPersistenceProvider; return self(); } - public T roleChangeNotifier(ActorRef roleChangeNotifier) { - this.roleChangeNotifier = roleChangeNotifier; + public T roleChangeNotifier(ActorRef newRoleChangeNotifier) { + this.roleChangeNotifier = newRoleChangeNotifier; return self(); } - public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) { - this.snapshotMessageSupport = snapshotMessageSupport; + public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) { + this.snapshotMessageSupport = newSnapshotMessageSupport; return self(); } - public T restoreFromSnapshot(byte[] restoreFromSnapshot) { - this.restoreFromSnapshot = restoreFromSnapshot; + public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) { + this.restoreFromSnapshot = newRestoreFromSnapshot; return self(); } - public T persistent(Optional persistent) { - this.persistent = persistent; + public T persistent(Optional newPersistent) { + this.persistent = newPersistent; return self(); } - public T pauseLeaderFunction(Function pauseLeaderFunction) { - this.pauseLeaderFunction = pauseLeaderFunction; + public T pauseLeaderFunction(Function newPauseLeaderFunction) { + this.pauseLeaderFunction = newPauseLeaderFunction; + return self(); + } + + public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) { + this.snapshotCohort = newSnapshotCohort; return self(); } @@ -349,4 +363,53 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, super(MockRaftActor.class); } } + + public static class MockSnapshotState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + private final List state; + + public MockSnapshotState(List state) { + this.state = state; + } + + public List getState() { + return state; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (state == null ? 0 : state.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + MockSnapshotState other = (MockSnapshotState) obj; + if (state == null) { + if (other.state != null) { + return false; + } + } else if (!state.equals(other.state)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "MockSnapshotState [state=" + state + "]"; + } + } }