import akka.actor.ActorRef;
import akka.actor.InvalidActorNameException;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
private final TestActorRef<MessageCollectorActor> collectorActor;
private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
- private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
- TestActorRef<MessageCollectorActor> collectorActor) {
- super(id, peerAddresses, Optional.of(config), null);
- this.collectorActor = collectorActor;
- }
-
- public static Props props(String id, Map<String, String> peerAddresses, ConfigParams config,
- TestActorRef<MessageCollectorActor> collectorActor) {
- return Props.create(TestRaftActor.class, id, peerAddresses, config, collectorActor).
- withDispatcher(Dispatchers.DefaultDispatcherId());
+ private TestRaftActor(Builder builder) {
+ super(builder);
+ this.collectorActor = builder.collectorActor;
}
void startDropMessages(Class<?> msgClass) {
}
void setMockTotalMemory(final long mockTotalMemory) {
- if(mockTotalMemory > 0) {
- getRaftActorContext().setTotalMemoryRetriever(new Supplier<Long>() {
- @Override
- public Long get() {
- return mockTotalMemory;
- }
-
- });
- } else {
- getRaftActorContext().setTotalMemoryRetriever(null);
- }
+ getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null);
}
@Override
public ActorRef collectorActor() {
return collectorActor;
}
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder extends AbstractBuilder<Builder, TestRaftActor> {
+ private TestActorRef<MessageCollectorActor> collectorActor;
+
+ public Builder collectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+ this.collectorActor = collectorActor;
+ return this;
+ }
+
+ private Builder() {
+ super(TestRaftActor.class);
+ }
+ }
}
+ protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+
protected final Logger testLog = LoggerFactory.getLogger(getClass());
protected final TestActorFactory factory = new TestActorFactory(getSystem());
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
return configParams;
}
protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
ConfigParams configParams) {
- TestActorRef<MessageCollectorActor> collectorActor = factory.createTestActor(
+ return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses :
+ Collections.<String, String>emptyMap()).config(configParams));
+ }
+
+ protected TestActorRef<TestRaftActor> newTestRaftActor(String id, TestRaftActor.Builder builder) {
+ builder.collectorActor(factory.<MessageCollectorActor>createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- factory.generateActorId(id + "-collector"));
+ factory.generateActorId(id + "-collector"))).id(id);
InvalidActorNameException lastEx = null;
for(int i = 0; i < 10; i++) {
try {
- return factory.createTestActor(TestRaftActor.props(id,
- peerAddresses != null ? peerAddresses : Collections.<String, String>emptyMap(),
- configParams, collectorActor), id);
+ return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
} catch (InvalidActorNameException e) {
lastEx = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
protected String testActorPath(String id){
- return "akka://test/user" + id;
+ return factory.createTestActorPath(id);
}
protected void verifyLeadersTrimmedLog(long lastIndex) {