X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FMockRaftActor.java;h=bafcc55bc7b71f957bd1c8b529631b08808f7898;hb=88a7f904602133bb803752848bb58c9b0a3e9792;hp=38650e834f1eb015a0831a6078dd88bf904a22ba;hpb=11dadddb4d9ba26ae0b1795921c7a218a6d893c2;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 38650e834f..bafcc55bc7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -10,26 +10,30 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; + import akka.actor.ActorRef; import akka.actor.Props; -import com.google.common.base.Optional; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.ObjectInputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; +import java.util.function.Function; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.messages.Payload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.yangtools.concepts.Identifier; public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { - public static final short PAYLOAD_VERSION = 5; final RaftActor actorDelegate; @@ -41,17 +45,21 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); private RaftActorRecoverySupport raftActorRecoverySupport; private RaftActorSnapshotMessageSupport snapshotMessageSupport; - private final byte[] restoreFromSnapshot; + private final Snapshot restoreFromSnapshot; final CountDownLatch snapshotCommitted = new CountDownLatch(1); + private final Function pauseLeaderFunction; - protected MockRaftActor(Builder builder) { - super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION); - state = new ArrayList<>(); - this.actorDelegate = mock(RaftActor.class); - this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); - this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); + protected MockRaftActor(final AbstractBuilder builder) { + super(builder.id, builder.peerAddresses != null ? builder.peerAddresses : + Collections.emptyMap(), Optional.ofNullable(builder.config), PAYLOAD_VERSION); + state = Collections.synchronizedList(new ArrayList<>()); + actorDelegate = mock(RaftActor.class); + recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); - if(builder.dataPersistenceProvider == null){ + snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort : + mock(RaftActorSnapshotCohort.class); + + if (builder.dataPersistenceProvider == null) { setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true); } else { setPersistence(builder.dataPersistenceProvider); @@ -60,9 +68,10 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, roleChangeNotifier = builder.roleChangeNotifier; snapshotMessageSupport = builder.snapshotMessageSupport; restoreFromSnapshot = builder.restoreFromSnapshot; + pauseLeaderFunction = builder.pauseLeaderFunction; } - public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) { + public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) { raftActorRecoverySupport = support; } @@ -77,6 +86,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport()); } + @Override + public RaftActorContext getRaftActorContext() { + return super.getRaftActorContext(); + } + public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() { return snapshotMessageSupport; } @@ -85,7 +99,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, try { assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); } catch (InterruptedException e) { - e.printStackTrace(); + throw new RuntimeException(e); } } @@ -93,14 +107,14 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, try { assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS)); } catch (InterruptedException e) { - e.printStackTrace(); + throw new RuntimeException(e); } } - public void waitUntilLeader(){ - for(int i = 0;i < 10; i++){ - if(isLeader()){ + public void waitUntilLeader() { + for (int i = 0; i < 10; i++) { + if (isLeader()) { break; } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -111,8 +125,8 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return state; } - - @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { + @Override + protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { actorDelegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called: {}", persistenceId(), data); @@ -120,7 +134,6 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { return this; } @@ -131,11 +144,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void startLogRecoveryBatch(int maxBatchSize) { + public void startLogRecoveryBatch(final int maxBatchSize) { } @Override - public void appendRecoveredLogEntry(Payload data) { + public void appendRecoveredLogEntry(final Payload data) { state.add(data); } @@ -156,33 +169,38 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(byte[] bytes) { - recoveryCohortDelegate.applyRecoverySnapshot(bytes); - applySnapshotBytes(bytes); + public void applyRecoverySnapshot(final Snapshot.State newState) { + recoveryCohortDelegate.applyRecoverySnapshot(newState); + applySnapshotState(newState); } - private void applySnapshotBytes(byte[] bytes) { - try { - Object data = toObject(bytes); - if (data instanceof List) { - state.addAll((List) data); - } - } catch (Exception e) { - e.printStackTrace(); + private void applySnapshotState(final Snapshot.State newState) { + if (newState instanceof MockSnapshotState mockState) { + state.clear(); + state.addAll(mockState.getState()); } } @Override - public void createSnapshot(ActorRef actorRef) { + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { LOG.info("{}: createSnapshot called", persistenceId()); - snapshotCohortDelegate.createSnapshot(actorRef); + snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream); } @Override - public void applySnapshot(byte [] snapshot) { + public void applySnapshot(final Snapshot.State newState) { LOG.info("{}: applySnapshot called", persistenceId()); - applySnapshotBytes(snapshot); - snapshotCohortDelegate.applySnapshot(snapshot); + applySnapshotState(newState); + snapshotCohortDelegate.applySnapshot(newState); + } + + @Override + public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) { + try { + return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read()); + } catch (IOException e) { + throw new RuntimeException("Error deserializing state", e); + } } @Override @@ -192,125 +210,192 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override protected Optional getRoleChangeNotifier() { - return Optional.fromNullable(roleChangeNotifier); + return Optional.ofNullable(roleChangeNotifier); } @Override public String persistenceId() { - return this.getId(); + return getId(); } - protected void newBehavior(RaftActorBehavior newBehavior) { + protected void newBehavior(final RaftActorBehavior newBehavior) { self().tell(newBehavior, ActorRef.noSender()); } @Override - public void handleCommand(final Object message) { - if(message instanceof RaftActorBehavior) { + protected void handleCommand(final Object message) { + if (message instanceof RaftActorBehavior) { super.changeCurrentBehavior((RaftActorBehavior)message); } else { super.handleCommand(message); - if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) { + if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) { snapshotCommitted.countDown(); } } } - public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } + @Override + protected void pauseLeader(final Runnable operation) { + if (pauseLeaderFunction != null) { + pauseLeaderFunction.apply(operation); + } else { + super.pauseLeader(operation); } - return obj; } - public ReplicatedLog getReplicatedLog(){ - return this.getRaftActorContext().getReplicatedLog(); + public static List fromState(final Snapshot.State from) { + if (from instanceof MockSnapshotState mockState) { + return mockState.getState(); + } + + throw new IllegalStateException("Unexpected snapshot State: " + from); + } + + public ReplicatedLog getReplicatedLog() { + return getRaftActorContext().getReplicatedLog(); } @Override - public byte[] getRestoreFromSnapshot() { + public Snapshot getRestoreFromSnapshot() { return restoreFromSnapshot; } - public static Props props(final String id, final Map peerAddresses, - ConfigParams config){ + public static Props props(final String id, final Map peerAddresses, final ConfigParams config) { return builder().id(id).peerAddresses(peerAddresses).config(config).props(); } public static Props props(final String id, final Map peerAddresses, - ConfigParams config, DataPersistenceProvider dataPersistenceProvider){ - return builder().id(id).peerAddresses(peerAddresses).config(config). - dataPersistenceProvider(dataPersistenceProvider).props(); + final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) { + return builder().id(id).peerAddresses(peerAddresses).config(config) + .dataPersistenceProvider(dataPersistenceProvider).props(); } public static Builder builder() { return new Builder(); } - public static class Builder { + public static class AbstractBuilder, A extends MockRaftActor> { private Map peerAddresses = Collections.emptyMap(); private String id; private ConfigParams config; private DataPersistenceProvider dataPersistenceProvider; private ActorRef roleChangeNotifier; private RaftActorSnapshotMessageSupport snapshotMessageSupport; - private byte[] restoreFromSnapshot; - private Optional persistent = Optional.absent(); + private Snapshot restoreFromSnapshot; + private Optional persistent = Optional.empty(); + private final Class actorClass; + private Function pauseLeaderFunction; + private RaftActorSnapshotCohort snapshotCohort; + + protected AbstractBuilder(final Class actorClass) { + this.actorClass = actorClass; + } - public Builder id(String id) { - this.id = id; - return this; + @SuppressWarnings("unchecked") + private T self() { + return (T) this; } - public Builder peerAddresses(Map peerAddresses) { - this.peerAddresses = peerAddresses; - return this; + public T id(final String newId) { + id = newId; + return self(); } - public Builder config(ConfigParams config) { - this.config = config; - return this; + public T peerAddresses(final Map newPeerAddresses) { + peerAddresses = newPeerAddresses; + return self(); } - public Builder dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) { - this.dataPersistenceProvider = dataPersistenceProvider; - return this; + public T config(final ConfigParams newConfig) { + config = newConfig; + return self(); } - public Builder roleChangeNotifier(ActorRef roleChangeNotifier) { - this.roleChangeNotifier = roleChangeNotifier; - return this; + public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) { + dataPersistenceProvider = newDataPersistenceProvider; + return self(); } - public Builder snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) { - this.snapshotMessageSupport = snapshotMessageSupport; - return this; + public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) { + roleChangeNotifier = newRoleChangeNotifier; + return self(); } - public Builder restoreFromSnapshot(byte[] restoreFromSnapshot) { - this.restoreFromSnapshot = restoreFromSnapshot; - return this; + public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) { + snapshotMessageSupport = newSnapshotMessageSupport; + return self(); } - public Builder persistent(Optional persistent) { - this.persistent = persistent; - return this; + public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) { + restoreFromSnapshot = newRestoreFromSnapshot; + return self(); + } + + public T persistent(final Optional newPersistent) { + persistent = newPersistent; + return self(); + } + + public T pauseLeaderFunction(final Function newPauseLeaderFunction) { + pauseLeaderFunction = newPauseLeaderFunction; + return self(); + } + + public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) { + snapshotCohort = newSnapshotCohort; + return self(); } public Props props() { - return Props.create(MockRaftActor.class, this); + return Props.create(actorClass, this); + } + } + + public static class Builder extends AbstractBuilder { + Builder() { + super(MockRaftActor.class); + } + } + + public static class MockSnapshotState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + private final List state; + + public MockSnapshotState(final List state) { + this.state = state; + } + + public List getState() { + return state; + } + + @Override + public int hashCode() { + return Objects.hash(state); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + MockSnapshotState other = (MockSnapshotState) obj; + if (!Objects.equals(state, other.state)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "MockSnapshotState [state=" + state + "]"; } } }