import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.Props;
-import akka.japi.Creator;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
-
public static final short PAYLOAD_VERSION = 5;
final RaftActor actorDelegate;
final RaftActorRecoveryCohort recoveryCohortDelegate;
- final RaftActorSnapshotCohort snapshotCohortDelegate;
+ volatile RaftActorSnapshotCohort snapshotCohortDelegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
- private ActorRef roleChangeNotifier;
- private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
+ private final ActorRef roleChangeNotifier;
+ protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
private RaftActorRecoverySupport raftActorRecoverySupport;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
+ private final byte[] restoreFromSnapshot;
+ final CountDownLatch snapshotCommitted = new CountDownLatch(1);
+ private final Function<Runnable, Void> pauseLeaderFunction;
- public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
- private static final long serialVersionUID = 1L;
- private final Map<String, String> peerAddresses;
- private final String id;
- private final Optional<ConfigParams> config;
- private final DataPersistenceProvider dataPersistenceProvider;
- private final ActorRef roleChangeNotifier;
- private RaftActorSnapshotMessageSupport snapshotMessageSupport;
-
- private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
- Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
- ActorRef roleChangeNotifier) {
- this.peerAddresses = peerAddresses;
- this.id = id;
- this.config = config;
- this.dataPersistenceProvider = dataPersistenceProvider;
- this.roleChangeNotifier = roleChangeNotifier;
- }
-
- @Override
- public MockRaftActor create() throws Exception {
- MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
- dataPersistenceProvider);
- mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
- mockRaftActor.snapshotMessageSupport = snapshotMessageSupport;
- return mockRaftActor;
- }
- }
-
- public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
- DataPersistenceProvider dataPersistenceProvider) {
- super(id, peerAddresses, config, PAYLOAD_VERSION);
+ protected MockRaftActor(AbstractBuilder<?, ?> builder) {
+ super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
state = new ArrayList<>();
this.actorDelegate = mock(RaftActor.class);
this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
- if(dataPersistenceProvider == null){
- setPersistence(true);
+
+ if(builder.dataPersistenceProvider == null){
+ setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
} else {
- setPersistence(dataPersistenceProvider);
+ setPersistence(builder.dataPersistenceProvider);
}
+
+ roleChangeNotifier = builder.roleChangeNotifier;
+ snapshotMessageSupport = builder.snapshotMessageSupport;
+ restoreFromSnapshot = builder.restoreFromSnapshot;
+ pauseLeaderFunction = builder.pauseLeaderFunction;
}
public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
@Override
protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
- return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
+ return snapshotMessageSupport != null ? snapshotMessageSupport :
+ (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
+ }
+
+ public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
+ return snapshotMessageSupport;
}
public void waitForRecoveryComplete() {
return state;
}
- public static Props props(final String id, final Map<String, String> peerAddresses,
- Optional<ConfigParams> config){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
- }
-
- public static Props props(final String id, final Map<String, String> peerAddresses,
- Optional<ConfigParams> config, RaftActorSnapshotMessageSupport snapshotMessageSupport){
- MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null);
- creator.snapshotMessageSupport = snapshotMessageSupport;
- return Props.create(creator);
- }
-
- public static Props props(final String id, final Map<String, String> peerAddresses,
- Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
- }
-
- public static Props props(final String id, final Map<String, String> peerAddresses,
- Optional<ConfigParams> config, ActorRef roleChangeNotifier){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
- }
-
- public static Props props(final String id, final Map<String, String> peerAddresses,
- Optional<ConfigParams> config, ActorRef roleChangeNotifier,
- DataPersistenceProvider dataPersistenceProvider){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
- }
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
actorDelegate.applyState(clientActor, identifier, data);
@Override
public void applyRecoverySnapshot(byte[] bytes) {
recoveryCohortDelegate.applyRecoverySnapshot(bytes);
+ applySnapshotBytes(bytes);
+ }
+
+ private void applySnapshotBytes(byte[] bytes) {
try {
Object data = toObject(bytes);
if (data instanceof List) {
@Override
public void applySnapshot(byte [] snapshot) {
LOG.info("{}: applySnapshot called", persistenceId());
+ applySnapshotBytes(snapshot);
snapshotCohortDelegate.applySnapshot(snapshot);
}
return this.getId();
}
+ protected void newBehavior(RaftActorBehavior newBehavior) {
+ self().tell(newBehavior, ActorRef.noSender());
+ }
+
+ @Override
+ public void handleCommand(final Object message) {
+ if(message instanceof RaftActorBehavior) {
+ super.changeCurrentBehavior((RaftActorBehavior)message);
+ } else {
+ super.handleCommand(message);
+
+ if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
+ snapshotCommitted.countDown();
+ }
+ }
+ }
+
+ @Override
+ protected void pauseLeader(Runnable operation) {
+ if(pauseLeaderFunction != null) {
+ pauseLeaderFunction.apply(operation);
+ } else {
+ super.pauseLeader(operation);
+ }
+ }
+
public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
public ReplicatedLog getReplicatedLog(){
return this.getRaftActorContext().getReplicatedLog();
}
-}
\ No newline at end of file
+
+ @Override
+ public byte[] getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
+
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ ConfigParams config){
+ return builder().id(id).peerAddresses(peerAddresses).config(config).props();
+ }
+
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
+ return builder().id(id).peerAddresses(peerAddresses).config(config).
+ dataPersistenceProvider(dataPersistenceProvider).props();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
+ private Map<String, String> peerAddresses = Collections.emptyMap();
+ private String id;
+ private ConfigParams config;
+ private DataPersistenceProvider dataPersistenceProvider;
+ private ActorRef roleChangeNotifier;
+ private RaftActorSnapshotMessageSupport snapshotMessageSupport;
+ private byte[] restoreFromSnapshot;
+ private Optional<Boolean> persistent = Optional.absent();
+ private final Class<A> actorClass;
+ private Function<Runnable, Void> pauseLeaderFunction;
+
+ protected AbstractBuilder(Class<A> actorClass) {
+ this.actorClass = actorClass;
+ }
+
+ @SuppressWarnings("unchecked")
+ private T self() {
+ return (T) this;
+ }
+
+ public T id(String id) {
+ this.id = id;
+ return self();
+ }
+
+ public T peerAddresses(Map<String, String> peerAddresses) {
+ this.peerAddresses = peerAddresses;
+ return self();
+ }
+
+ public T config(ConfigParams config) {
+ this.config = config;
+ return self();
+ }
+
+ public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
+ this.dataPersistenceProvider = dataPersistenceProvider;
+ return self();
+ }
+
+ public T roleChangeNotifier(ActorRef roleChangeNotifier) {
+ this.roleChangeNotifier = roleChangeNotifier;
+ return self();
+ }
+
+ public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
+ this.snapshotMessageSupport = snapshotMessageSupport;
+ return self();
+ }
+
+ public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ return self();
+ }
+
+ public T persistent(Optional<Boolean> persistent) {
+ this.persistent = persistent;
+ return self();
+ }
+
+ public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
+ this.pauseLeaderFunction = pauseLeaderFunction;
+ return self();
+ }
+
+ public Props props() {
+ return Props.create(actorClass, this);
+ }
+ }
+
+ public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
+ private Builder() {
+ super(MockRaftActor.class);
+ }
+ }
+}