import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
+
import akka.actor.ActorRef;
import akka.actor.Props;
import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.ByteArrayInputStream;
import java.io.IOException;
private final Function<Runnable, Void> pauseLeaderFunction;
protected MockRaftActor(AbstractBuilder<?, ?> builder) {
- super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
+ super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
+ Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
state = new ArrayList<>();
this.actorDelegate = mock(RaftActor.class);
this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
- this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
- if(builder.dataPersistenceProvider == null){
+ this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
+ mock(RaftActorSnapshotCohort.class);
+
+ if (builder.dataPersistenceProvider == null) {
setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
} else {
setPersistence(builder.dataPersistenceProvider);
try {
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- e.printStackTrace();
+ Throwables.propagate(e);
}
}
try {
assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- e.printStackTrace();
+ Throwables.propagate(e);
}
}
- public void waitUntilLeader(){
- for(int i = 0;i < 10; i++){
- if(isLeader()){
+ public void waitUntilLeader() {
+ for (int i = 0; i < 10; i++) {
+ if (isLeader()) {
break;
}
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
return state;
}
-
@Override
protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
actorDelegate.applyState(clientActor, identifier, data);
}
private void applySnapshotBytes(byte[] bytes) {
+ if (bytes.length == 0) {
+ return;
+ }
+
try {
Object data = toObject(bytes);
if (data instanceof List) {
+ state.clear();
state.addAll((List<?>) data);
}
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (ClassNotFoundException | IOException e) {
+ Throwables.propagate(e);
}
}
@Override
protected void handleCommand(final Object message) {
- if(message instanceof RaftActorBehavior) {
+ if (message instanceof RaftActorBehavior) {
super.changeCurrentBehavior((RaftActorBehavior)message);
} else {
super.handleCommand(message);
- if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
+ if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
snapshotCommitted.countDown();
}
}
@Override
protected void pauseLeader(Runnable operation) {
- if(pauseLeaderFunction != null) {
+ if (pauseLeaderFunction != null) {
pauseLeaderFunction.apply(operation);
} else {
super.pauseLeader(operation);
return obj;
}
- public ReplicatedLog getReplicatedLog(){
+ public ReplicatedLog getReplicatedLog() {
return this.getRaftActorContext().getReplicatedLog();
}
return restoreFromSnapshot;
}
- public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config){
+ public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
return builder().id(id).peerAddresses(peerAddresses).config(config).props();
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
- return builder().id(id).peerAddresses(peerAddresses).config(config).
- dataPersistenceProvider(dataPersistenceProvider).props();
+ ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
+ return builder().id(id).peerAddresses(peerAddresses).config(config)
+ .dataPersistenceProvider(dataPersistenceProvider).props();
}
public static Builder builder() {
private Optional<Boolean> persistent = Optional.absent();
private final Class<A> actorClass;
private Function<Runnable, Void> pauseLeaderFunction;
+ private RaftActorSnapshotCohort snapshotCohort;
protected AbstractBuilder(Class<A> actorClass) {
this.actorClass = actorClass;
return (T) this;
}
- public T id(String id) {
- this.id = id;
+ public T id(String newId) {
+ this.id = newId;
+ return self();
+ }
+
+ public T peerAddresses(Map<String, String> newPeerAddresses) {
+ this.peerAddresses = newPeerAddresses;
return self();
}
- public T peerAddresses(Map<String, String> peerAddresses) {
- this.peerAddresses = peerAddresses;
+ public T config(ConfigParams newConfig) {
+ this.config = newConfig;
return self();
}
- public T config(ConfigParams config) {
- this.config = config;
+ public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
+ this.dataPersistenceProvider = newDataPersistenceProvider;
return self();
}
- public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
- this.dataPersistenceProvider = dataPersistenceProvider;
+ public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
+ this.roleChangeNotifier = newRoleChangeNotifier;
return self();
}
- public T roleChangeNotifier(ActorRef roleChangeNotifier) {
- this.roleChangeNotifier = roleChangeNotifier;
+ public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
+ this.snapshotMessageSupport = newSnapshotMessageSupport;
return self();
}
- public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
- this.snapshotMessageSupport = snapshotMessageSupport;
+ public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
+ this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
- public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
- this.restoreFromSnapshot = restoreFromSnapshot;
+ public T persistent(Optional<Boolean> newPersistent) {
+ this.persistent = newPersistent;
return self();
}
- public T persistent(Optional<Boolean> persistent) {
- this.persistent = persistent;
+ public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
+ this.pauseLeaderFunction = newPauseLeaderFunction;
return self();
}
- public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
- this.pauseLeaderFunction = pauseLeaderFunction;
+ public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
+ this.snapshotCohort = newSnapshotCohort;
return self();
}