X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FMockRaftActor.java;h=b4b558b6f357a908c8ac537dd80e86c3abdd5eca;hp=586ca8cda05fac488d448949c0840ead847432df;hb=e1eca73a5ae2ffae8dd78c6fe5281cd2f45d5ef3;hpb=2727bea09c83646b6cbd2ef9672d0b7f6cf3b22f diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 586ca8cda0..b4b558b6f3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -10,76 +10,64 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; + import akka.actor.ActorRef; import akka.actor.Props; -import akka.japi.Creator; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.Uninterruptibles; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { + public static final short PAYLOAD_VERSION = 5; final RaftActor actorDelegate; final RaftActorRecoveryCohort recoveryCohortDelegate; - final RaftActorSnapshotCohort snapshotCohortDelegate; + volatile RaftActorSnapshotCohort snapshotCohortDelegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; - private ActorRef roleChangeNotifier; - private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); + private final ActorRef roleChangeNotifier; + protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); private RaftActorRecoverySupport raftActorRecoverySupport; private RaftActorSnapshotMessageSupport snapshotMessageSupport; + private final byte[] restoreFromSnapshot; + final CountDownLatch snapshotCommitted = new CountDownLatch(1); + private final Function pauseLeaderFunction; - public static final class MockRaftActorCreator implements Creator { - private static final long serialVersionUID = 1L; - private final Map peerAddresses; - private final String id; - private final Optional config; - private final DataPersistenceProvider dataPersistenceProvider; - private final ActorRef roleChangeNotifier; - private RaftActorSnapshotMessageSupport snapshotMessageSupport; - - private MockRaftActorCreator(Map peerAddresses, String id, - Optional config, DataPersistenceProvider dataPersistenceProvider, - ActorRef roleChangeNotifier) { - this.peerAddresses = peerAddresses; - this.id = id; - this.config = config; - this.dataPersistenceProvider = dataPersistenceProvider; - this.roleChangeNotifier = roleChangeNotifier; - } - - @Override - public MockRaftActor create() throws Exception { - MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config, - dataPersistenceProvider); - mockRaftActor.roleChangeNotifier = this.roleChangeNotifier; - mockRaftActor.snapshotMessageSupport = snapshotMessageSupport; - return mockRaftActor; - } - } - - public MockRaftActor(String id, Map peerAddresses, Optional config, - DataPersistenceProvider dataPersistenceProvider) { - super(id, peerAddresses, config); + protected MockRaftActor(AbstractBuilder builder) { + super(builder.id, builder.peerAddresses != null ? builder.peerAddresses : + Collections.emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION); state = new ArrayList<>(); this.actorDelegate = mock(RaftActor.class); this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); - this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); - if(dataPersistenceProvider == null){ - setPersistence(true); + + this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort : + mock(RaftActorSnapshotCohort.class); + + if (builder.dataPersistenceProvider == null) { + setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true); } else { - setPersistence(dataPersistenceProvider); + setPersistence(builder.dataPersistenceProvider); } + + roleChangeNotifier = builder.roleChangeNotifier; + snapshotMessageSupport = builder.snapshotMessageSupport; + restoreFromSnapshot = builder.restoreFromSnapshot; + pauseLeaderFunction = builder.pauseLeaderFunction; } public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) { @@ -93,14 +81,19 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() { - return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport(); + return snapshotMessageSupport != null ? snapshotMessageSupport : + (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport()); + } + + public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() { + return snapshotMessageSupport; } public void waitForRecoveryComplete() { try { assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); } catch (InterruptedException e) { - e.printStackTrace(); + Throwables.propagate(e); } } @@ -108,14 +101,14 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, try { assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS)); } catch (InterruptedException e) { - e.printStackTrace(); + Throwables.propagate(e); } } - public void waitUntilLeader(){ - for(int i = 0;i < 10; i++){ - if(isLeader()){ + public void waitUntilLeader() { + for (int i = 0; i < 10; i++) { + if (isLeader()) { break; } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -126,35 +119,8 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return state; } - public static Props props(final String id, final Map peerAddresses, - Optional config){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null)); - } - - public static Props props(final String id, final Map peerAddresses, - Optional config, RaftActorSnapshotMessageSupport snapshotMessageSupport){ - MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null); - creator.snapshotMessageSupport = snapshotMessageSupport; - return Props.create(creator); - } - - public static Props props(final String id, final Map peerAddresses, - Optional config, DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null)); - } - - public static Props props(final String id, final Map peerAddresses, - Optional config, ActorRef roleChangeNotifier){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); - } - - public static Props props(final String id, final Map peerAddresses, - Optional config, ActorRef roleChangeNotifier, - DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); - } - - @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { + @Override + protected void applyState(ActorRef clientActor, Identifier identifier, Object data) { actorDelegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called: {}", persistenceId(), data); @@ -200,13 +166,22 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override public void applyRecoverySnapshot(byte[] bytes) { recoveryCohortDelegate.applyRecoverySnapshot(bytes); + applySnapshotBytes(bytes); + } + + private void applySnapshotBytes(byte[] bytes) { + if (bytes.length == 0) { + return; + } + try { Object data = toObject(bytes); if (data instanceof List) { + state.clear(); state.addAll((List) data); } - } catch (Exception e) { - e.printStackTrace(); + } catch (ClassNotFoundException | IOException e) { + Throwables.propagate(e); } } @@ -219,6 +194,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override public void applySnapshot(byte [] snapshot) { LOG.info("{}: applySnapshot called", persistenceId()); + applySnapshotBytes(snapshot); snapshotCohortDelegate.applySnapshot(snapshot); } @@ -236,6 +212,32 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return this.getId(); } + protected void newBehavior(RaftActorBehavior newBehavior) { + self().tell(newBehavior, ActorRef.noSender()); + } + + @Override + protected void handleCommand(final Object message) { + if (message instanceof RaftActorBehavior) { + super.changeCurrentBehavior((RaftActorBehavior)message); + } else { + super.handleCommand(message); + + if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) { + snapshotCommitted.countDown(); + } + } + } + + @Override + protected void pauseLeader(Runnable operation) { + if (pauseLeaderFunction != null) { + pauseLeaderFunction.apply(operation); + } else { + super.pauseLeader(operation); + } + } + public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; @@ -255,7 +257,109 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return obj; } - public ReplicatedLog getReplicatedLog(){ + public ReplicatedLog getReplicatedLog() { return this.getRaftActorContext().getReplicatedLog(); } -} \ No newline at end of file + + @Override + public byte[] getRestoreFromSnapshot() { + return restoreFromSnapshot; + } + + public static Props props(final String id, final Map peerAddresses, ConfigParams config) { + return builder().id(id).peerAddresses(peerAddresses).config(config).props(); + } + + public static Props props(final String id, final Map peerAddresses, + ConfigParams config, DataPersistenceProvider dataPersistenceProvider) { + return builder().id(id).peerAddresses(peerAddresses).config(config) + .dataPersistenceProvider(dataPersistenceProvider).props(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class AbstractBuilder, A extends MockRaftActor> { + private Map peerAddresses = Collections.emptyMap(); + private String id; + private ConfigParams config; + private DataPersistenceProvider dataPersistenceProvider; + private ActorRef roleChangeNotifier; + private RaftActorSnapshotMessageSupport snapshotMessageSupport; + private byte[] restoreFromSnapshot; + private Optional persistent = Optional.absent(); + private final Class actorClass; + private Function pauseLeaderFunction; + private RaftActorSnapshotCohort snapshotCohort; + + protected AbstractBuilder(Class actorClass) { + this.actorClass = actorClass; + } + + @SuppressWarnings("unchecked") + private T self() { + return (T) this; + } + + public T id(String newId) { + this.id = newId; + return self(); + } + + public T peerAddresses(Map newPeerAddresses) { + this.peerAddresses = newPeerAddresses; + return self(); + } + + public T config(ConfigParams newConfig) { + this.config = newConfig; + return self(); + } + + public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) { + this.dataPersistenceProvider = newDataPersistenceProvider; + return self(); + } + + public T roleChangeNotifier(ActorRef newRoleChangeNotifier) { + this.roleChangeNotifier = newRoleChangeNotifier; + return self(); + } + + public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) { + this.snapshotMessageSupport = newSnapshotMessageSupport; + return self(); + } + + public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) { + this.restoreFromSnapshot = newRestoreFromSnapshot; + return self(); + } + + public T persistent(Optional newPersistent) { + this.persistent = newPersistent; + return self(); + } + + public T pauseLeaderFunction(Function newPauseLeaderFunction) { + this.pauseLeaderFunction = newPauseLeaderFunction; + return self(); + } + + public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) { + this.snapshotCohort = newSnapshotCohort; + return self(); + } + + public Props props() { + return Props.create(actorClass, this); + } + } + + public static class Builder extends AbstractBuilder { + private Builder() { + super(MockRaftActor.class); + } + } +}