import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
+
import akka.actor.ActorRef;
import akka.actor.Props;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
-
public static final short PAYLOAD_VERSION = 5;
final RaftActor actorDelegate;
protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
private RaftActorRecoverySupport raftActorRecoverySupport;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
- private final byte[] restoreFromSnapshot;
+ private final Snapshot restoreFromSnapshot;
final CountDownLatch snapshotCommitted = new CountDownLatch(1);
+ private final Function<Runnable, Void> pauseLeaderFunction;
- protected MockRaftActor(AbstractBuilder<?, ?> builder) {
- super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
+ protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
+ super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
+ Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
state = new ArrayList<>();
this.actorDelegate = mock(RaftActor.class);
this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
- this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
- if(builder.dataPersistenceProvider == null){
+ this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
+ mock(RaftActorSnapshotCohort.class);
+
+ if (builder.dataPersistenceProvider == null) {
setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
} else {
setPersistence(builder.dataPersistenceProvider);
roleChangeNotifier = builder.roleChangeNotifier;
snapshotMessageSupport = builder.snapshotMessageSupport;
restoreFromSnapshot = builder.restoreFromSnapshot;
+ pauseLeaderFunction = builder.pauseLeaderFunction;
}
- public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
+ public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) {
raftActorRecoverySupport = support;
}
(snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
}
+ @Override
+ public RaftActorContext getRaftActorContext() {
+ return super.getRaftActorContext();
+ }
+
public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
return snapshotMessageSupport;
}
try {
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
}
try {
assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
}
- public void waitUntilLeader(){
- for(int i = 0;i < 10; i++){
- if(isLeader()){
+ public void waitUntilLeader() {
+ for (int i = 0; i < 10; i++) {
+ if (isLeader()) {
break;
}
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
return state;
}
-
- @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+ @Override
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
actorDelegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called: {}", persistenceId(), data);
}
@Override
- public void startLogRecoveryBatch(int maxBatchSize) {
+ public void startLogRecoveryBatch(final int maxBatchSize) {
}
@Override
- public void appendRecoveredLogEntry(Payload data) {
+ public void appendRecoveredLogEntry(final Payload data) {
state.add(data);
}
}
@Override
- public void applyRecoverySnapshot(byte[] bytes) {
- recoveryCohortDelegate.applyRecoverySnapshot(bytes);
- applySnapshotBytes(bytes);
+ public void applyRecoverySnapshot(final Snapshot.State newState) {
+ recoveryCohortDelegate.applyRecoverySnapshot(newState);
+ applySnapshotState(newState);
}
- private void applySnapshotBytes(byte[] bytes) {
- try {
- Object data = toObject(bytes);
- if (data instanceof List) {
- state.addAll((List<?>) data);
- }
- } catch (Exception e) {
- e.printStackTrace();
+ private void applySnapshotState(final Snapshot.State newState) {
+ if (newState instanceof MockSnapshotState) {
+ state.clear();
+ state.addAll(((MockSnapshotState)newState).getState());
}
}
@Override
- public void createSnapshot(ActorRef actorRef) {
+ public void createSnapshot(final ActorRef actorRef, final java.util.Optional<OutputStream> installSnapshotStream) {
LOG.info("{}: createSnapshot called", persistenceId());
- snapshotCohortDelegate.createSnapshot(actorRef);
+ snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
}
@Override
- public void applySnapshot(byte [] snapshot) {
+ public void applySnapshot(final Snapshot.State newState) {
LOG.info("{}: applySnapshot called", persistenceId());
- applySnapshotBytes(snapshot);
- snapshotCohortDelegate.applySnapshot(snapshot);
+ applySnapshotState(newState);
+ snapshotCohortDelegate.applySnapshot(newState);
+ }
+
+ @Override
+ public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
+ try {
+ return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
+ } catch (IOException e) {
+ throw new RuntimeException("Error deserializing state", e);
+ }
}
@Override
return this.getId();
}
- protected void newBehavior(RaftActorBehavior newBehavior) {
+ protected void newBehavior(final RaftActorBehavior newBehavior) {
self().tell(newBehavior, ActorRef.noSender());
}
@Override
- public void handleCommand(final Object message) {
- if(message instanceof RaftActorBehavior) {
+ protected void handleCommand(final Object message) {
+ if (message instanceof RaftActorBehavior) {
super.changeCurrentBehavior((RaftActorBehavior)message);
} else {
super.handleCommand(message);
- if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
+ if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
snapshotCommitted.countDown();
}
}
}
- public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
- Object obj = null;
- ByteArrayInputStream bis = null;
- ObjectInputStream ois = null;
- try {
- bis = new ByteArrayInputStream(bs);
- ois = new ObjectInputStream(bis);
- obj = ois.readObject();
- } finally {
- if (bis != null) {
- bis.close();
- }
- if (ois != null) {
- ois.close();
- }
+ @Override
+ protected void pauseLeader(final Runnable operation) {
+ if (pauseLeaderFunction != null) {
+ pauseLeaderFunction.apply(operation);
+ } else {
+ super.pauseLeader(operation);
+ }
+ }
+
+ public static List<Object> fromState(final Snapshot.State from) {
+ if (from instanceof MockSnapshotState) {
+ return ((MockSnapshotState)from).getState();
}
- return obj;
+
+ throw new IllegalStateException("Unexpected snapshot State: " + from);
}
- public ReplicatedLog getReplicatedLog(){
+ public ReplicatedLog getReplicatedLog() {
return this.getRaftActorContext().getReplicatedLog();
}
@Override
- public byte[] getRestoreFromSnapshot() {
+ public Snapshot getRestoreFromSnapshot() {
return restoreFromSnapshot;
}
- public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config){
+ public static Props props(final String id, final Map<String, String> peerAddresses, final ConfigParams config) {
return builder().id(id).peerAddresses(peerAddresses).config(config).props();
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
- return builder().id(id).peerAddresses(peerAddresses).config(config).
- dataPersistenceProvider(dataPersistenceProvider).props();
+ final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) {
+ return builder().id(id).peerAddresses(peerAddresses).config(config)
+ .dataPersistenceProvider(dataPersistenceProvider).props();
}
public static Builder builder() {
private DataPersistenceProvider dataPersistenceProvider;
private ActorRef roleChangeNotifier;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
- private byte[] restoreFromSnapshot;
+ private Snapshot restoreFromSnapshot;
private Optional<Boolean> persistent = Optional.absent();
private final Class<A> actorClass;
+ private Function<Runnable, Void> pauseLeaderFunction;
+ private RaftActorSnapshotCohort snapshotCohort;
- protected AbstractBuilder(Class<A> actorClass) {
+ protected AbstractBuilder(final Class<A> actorClass) {
this.actorClass = actorClass;
}
return (T) this;
}
- public T id(String id) {
- this.id = id;
+ public T id(final String newId) {
+ this.id = newId;
+ return self();
+ }
+
+ public T peerAddresses(final Map<String, String> newPeerAddresses) {
+ this.peerAddresses = newPeerAddresses;
+ return self();
+ }
+
+ public T config(final ConfigParams newConfig) {
+ this.config = newConfig;
return self();
}
- public T peerAddresses(Map<String, String> peerAddresses) {
- this.peerAddresses = peerAddresses;
+ public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) {
+ this.dataPersistenceProvider = newDataPersistenceProvider;
return self();
}
- public T config(ConfigParams config) {
- this.config = config;
+ public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) {
+ this.roleChangeNotifier = newRoleChangeNotifier;
return self();
}
- public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
- this.dataPersistenceProvider = dataPersistenceProvider;
+ public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
+ this.snapshotMessageSupport = newSnapshotMessageSupport;
return self();
}
- public T roleChangeNotifier(ActorRef roleChangeNotifier) {
- this.roleChangeNotifier = roleChangeNotifier;
+ public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) {
+ this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
- public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
- this.snapshotMessageSupport = snapshotMessageSupport;
+ public T persistent(final Optional<Boolean> newPersistent) {
+ this.persistent = newPersistent;
return self();
}
- public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
- this.restoreFromSnapshot = restoreFromSnapshot;
+ public T pauseLeaderFunction(final Function<Runnable, Void> newPauseLeaderFunction) {
+ this.pauseLeaderFunction = newPauseLeaderFunction;
return self();
}
- public T persistent(Optional<Boolean> persistent) {
- this.persistent = persistent;
+ public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) {
+ this.snapshotCohort = newSnapshotCohort;
return self();
}
}
public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
- private Builder() {
+ Builder() {
super(MockRaftActor.class);
}
}
+
+ public static class MockSnapshotState implements Snapshot.State {
+ private static final long serialVersionUID = 1L;
+
+ private final List<Object> state;
+
+ public MockSnapshotState(final List<Object> state) {
+ this.state = state;
+ }
+
+ public List<Object> getState() {
+ return state;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (state == null ? 0 : state.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MockSnapshotState other = (MockSnapshotState) obj;
+ if (state == null) {
+ if (other.state != null) {
+ return false;
+ }
+ } else if (!state.equals(other.state)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "MockSnapshotState [state=" + state + "]";
+ }
+ }
}