import akka.actor.Props;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
final CountDownLatch snapshotCommitted = new CountDownLatch(1);
private final Function<Runnable, Void> pauseLeaderFunction;
- protected MockRaftActor(AbstractBuilder<?, ?> builder) {
+ protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
state = new ArrayList<>();
pauseLeaderFunction = builder.pauseLeaderFunction;
}
- public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
+ public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) {
raftActorRecoverySupport = support;
}
try {
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
try {
assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
}
@Override
- protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
actorDelegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called: {}", persistenceId(), data);
}
@Override
- public void startLogRecoveryBatch(int maxBatchSize) {
+ public void startLogRecoveryBatch(final int maxBatchSize) {
}
@Override
- public void appendRecoveredLogEntry(Payload data) {
+ public void appendRecoveredLogEntry(final Payload data) {
state.add(data);
}
}
@Override
- public void applyRecoverySnapshot(Snapshot.State newState) {
+ public void applyRecoverySnapshot(final Snapshot.State newState) {
recoveryCohortDelegate.applyRecoverySnapshot(newState);
applySnapshotState(newState);
}
- private void applySnapshotState(Snapshot.State newState) {
+ private void applySnapshotState(final Snapshot.State newState) {
if (newState instanceof MockSnapshotState) {
state.clear();
state.addAll(((MockSnapshotState)newState).getState());
}
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final java.util.Optional<OutputStream> installSnapshotStream) {
LOG.info("{}: createSnapshot called", persistenceId());
snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
}
@Override
- public void applySnapshot(Snapshot.State newState) {
+ public void applySnapshot(final Snapshot.State newState) {
LOG.info("{}: applySnapshot called", persistenceId());
applySnapshotState(newState);
snapshotCohortDelegate.applySnapshot(newState);
}
@Override
- public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+ public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
try {
return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
} catch (IOException e) {
return this.getId();
}
- protected void newBehavior(RaftActorBehavior newBehavior) {
+ protected void newBehavior(final RaftActorBehavior newBehavior) {
self().tell(newBehavior, ActorRef.noSender());
}
}
@Override
- protected void pauseLeader(Runnable operation) {
+ protected void pauseLeader(final Runnable operation) {
if (pauseLeaderFunction != null) {
pauseLeaderFunction.apply(operation);
} else {
}
}
- public static List<Object> fromState(Snapshot.State from) {
+ public static List<Object> fromState(final Snapshot.State from) {
if (from instanceof MockSnapshotState) {
return ((MockSnapshotState)from).getState();
}
return restoreFromSnapshot;
}
- public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
+ public static Props props(final String id, final Map<String, String> peerAddresses, final ConfigParams config) {
return builder().id(id).peerAddresses(peerAddresses).config(config).props();
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
+ final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) {
return builder().id(id).peerAddresses(peerAddresses).config(config)
.dataPersistenceProvider(dataPersistenceProvider).props();
}
private Function<Runnable, Void> pauseLeaderFunction;
private RaftActorSnapshotCohort snapshotCohort;
- protected AbstractBuilder(Class<A> actorClass) {
+ protected AbstractBuilder(final Class<A> actorClass) {
this.actorClass = actorClass;
}
return (T) this;
}
- public T id(String newId) {
+ public T id(final String newId) {
this.id = newId;
return self();
}
- public T peerAddresses(Map<String, String> newPeerAddresses) {
+ public T peerAddresses(final Map<String, String> newPeerAddresses) {
this.peerAddresses = newPeerAddresses;
return self();
}
- public T config(ConfigParams newConfig) {
+ public T config(final ConfigParams newConfig) {
this.config = newConfig;
return self();
}
- public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
+ public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) {
this.dataPersistenceProvider = newDataPersistenceProvider;
return self();
}
- public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
+ public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) {
this.roleChangeNotifier = newRoleChangeNotifier;
return self();
}
- public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
+ public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
this.snapshotMessageSupport = newSnapshotMessageSupport;
return self();
}
- public T restoreFromSnapshot(Snapshot newRestoreFromSnapshot) {
+ public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) {
this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
- public T persistent(Optional<Boolean> newPersistent) {
+ public T persistent(final Optional<Boolean> newPersistent) {
this.persistent = newPersistent;
return self();
}
- public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
+ public T pauseLeaderFunction(final Function<Runnable, Void> newPauseLeaderFunction) {
this.pauseLeaderFunction = newPauseLeaderFunction;
return self();
}
- public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
+ public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) {
this.snapshotCohort = newSnapshotCohort;
return self();
}
}
public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
- private Builder() {
+ Builder() {
super(MockRaftActor.class);
}
}
private final List<Object> state;
- public MockSnapshotState(List<Object> state) {
+ public MockSnapshotState(final List<Object> state) {
this.state = state;
}
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}