import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.SerializationUtils;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
- private ActorSystem newActorSystem(String config) {
- ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
- actorSystems.add(system);
- return system;
+ private ActorSystem newActorSystem(final String config) {
+ return newActorSystem("cluster-test", config);
}
- private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+ private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if (system == getSystem()) {
- return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
+ return actorFactory.createActor(MessageCollectorActor.props(), name);
}
- return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+ return system.actorOf(MessageCollectorActor.props(), name);
}
private Props newShardMgrProps() {
return newShardMgrProps(new MockConfiguration());
}
- private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
+ private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
}
- private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
- return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
+ private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
+ return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
+ .distributedDataStore(mock(DistributedDataStore.class));
}
Dispatchers.DefaultDispatcherId());
}
- private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
- return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
+ private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
+ return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId());
}
return newTestShardManager(newShardMgrProps());
}
- private TestShardManager newTestShardManager(Props props) {
+ private TestShardManager newTestShardManager(final Props props) {
TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
TestShardManager shardManager = shardManagerActor.underlyingActor();
shardManager.waitForRecoveryComplete();
return shardManager;
}
- private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
+ private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
+ final JavaTestKit kit) {
AssertionError last = null;
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
}
@SuppressWarnings("unchecked")
- private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
+ private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final JavaTestKit kit, final String msg) {
Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
if (reply instanceof Failure) {
throw new AssertionError(msg + " failed", ((Failure)reply).cause());
final MockConfiguration mockConfig = new MockConfiguration() {
@Override
- public Collection<String> getMemberShardNames(MemberName memberName) {
+ public Collection<String> getMemberShardNames(final MemberName memberName) {
return Arrays.asList("default", "topology");
}
@Override
- public Collection<MemberName> getMembersFromShardName(String shardName) {
+ public Collection<MemberName> getMembersFromShardName(final String shardName) {
return members("member-1");
}
};
- final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
- Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
- final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
- Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
+ final ActorRef defaultShardActor = actorFactory.createActor(
+ MessageCollectorActor.props(), actorFactory.generateActorId("default"));
+ final ActorRef topologyShardActor = actorFactory.createActor(
+ MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
new HashMap<String, Entry<ActorRef, DatastoreContext>>());
final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
final CountDownLatch newShardActorLatch = new CountDownLatch(2);
class LocalShardManager extends ShardManager {
- LocalShardManager(AbstractShardManagerCreator<?> creator) {
+ LocalShardManager(final AbstractShardManagerCreator<?> creator) {
super(creator);
}
@Override
- protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ protected ActorRef newShardActor(final ShardInformation info) {
Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
ActorRef ref = null;
if (entry != null) {
LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
}
- @Test
- public void testOnRecoveryJournalIsCleaned() {
- String persistenceID = "shard-manager-" + shardMrgIDSuffix;
- InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo")));
- InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
- InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
-
- newTestShardManager();
-
- InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
-
- // Journal entries up to the last one should've been deleted
- Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
- synchronized (journal) {
- assertEquals("Journal size", 0, journal.size());
- }
- }
-
@Test
public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
TestShardManager shardManager = newTestShardManager();
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
@Override
- public List<String> getMemberShardNames(MemberName memberName) {
+ public List<String> getMemberShardNames(final MemberName memberName) {
return Arrays.asList("default", "astronauts");
}
}));
};
}
- private static List<MemberName> members(String... names) {
+ private static List<MemberName> members(final String... names) {
return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
}
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
SchemaContext schemaContext = TestModel.createTestContext();
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
new JavaTestKit(getSystem()) {
{
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
Shard.Builder shardBuilder = Shard.builder();
.put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
.put("astronauts", Collections.<String>emptyList()).build());
- TestActorRef<TestShardManager> shardManager = actorFactory
- .createTestActor(newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
Failure failure = kit.expectMsgClass(Failure.class);
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
- Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
+ Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
- byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
- assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
- ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
+ ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
+ assertNotNull("Expected ShardManagerSnapshot", snapshot);
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
Sets.newHashSet(snapshot.getShardList()));
.put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
.put("astronauts", Collections.<String>emptyList()).build());
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
- DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
- SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
+ DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
+ Collections.<ShardSnapshot>emptyList());
TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
.restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
- byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
- assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
- snapshot = SerializationUtils.deserialize(snapshotBytes);
+ assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
- Sets.newHashSet(snapshot.getShardList()));
+ Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
LOG.info("testRestoreFromSnapshot ending");
}
new JavaTestKit(getSystem()) {
{
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new AddShardReplica("model-inventory"), getRef());
Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
// Have a dummy snapshot to be overwritten by the new data
// persisted.
String[] restoredShards = { "default", "people" };
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
- newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
+ newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
shardManager.underlyingActor()
.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
.put("astronauts", Arrays.asList("member-2")).build());
final ActorRef newReplicaShardManager = actorFactory
- .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID);
+ .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
new JavaTestKit(getSystem()) {
{
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
{
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- final TestActorRef<MockRespondActor> respondActor = actorFactory
- .createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
- new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
+ final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
+ RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
.put("astronauts", Arrays.asList("member-2"))
.put("people", Arrays.asList("member-1", "member-2")).build());
- TestActorRef<TestShardManager> shardManager = actorFactory
- .createTestActor(newShardMgrProps(mockConfig));
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
+ newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.underlyingActor().waitForRecoveryComplete();
shardManager.tell(new FindLocalShard("people", false), getRef());
.put("people", Arrays.asList("member-1", "member-2")).build());
String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
- TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(MessageCollectorActor.props(),
- shardId);
+ ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
TestActorRef<TestShardManager> shardManager = actorFactory
- .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
+ .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.underlyingActor().waitForRecoveryComplete();
LOG.info("testShardPersistenceWithRestoredData starting");
new JavaTestKit(getSystem()) {
{
- MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
- .put("default", Arrays.asList("member-1", "member-2"))
- .put("astronauts", Arrays.asList("member-2"))
- .put("people", Arrays.asList("member-1", "member-2")).build());
- String[] restoredShards = { "default", "astronauts" };
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder()
+ .put("default", Arrays.asList("member-1", "member-2"))
+ .put("astronauts", Arrays.asList("member-2"))
+ .put("people", Arrays.asList("member-1", "member-2")).build());
+ String[] restoredShards = {"default", "astronauts"};
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
// create shardManager to come up with restored data
- TestActorRef<TestShardManager> newRestoredShardManager = actorFactory
- .createTestActor(newShardMgrProps(mockConfig));
+ TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
+ newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
.put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
- TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
- MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
+ ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
- TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
- MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
+ ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
- TestActorRef<TestShardManager> shardManager = actorFactory
- .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("shard1", shard1)
- .addShardActor("shard2", shard2).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
+ .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), shard1);
{
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- TestActorRef<MockRespondActor> respondActor = actorFactory
- .createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ ActorRef respondActor = actorFactory
+ .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
{
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- TestActorRef<MockRespondActor> respondActor = actorFactory
- .createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ ActorRef respondActor = actorFactory
+ .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
private CountDownLatch memberReachableReceived = new CountDownLatch(1);
private volatile MessageInterceptor messageInterceptor;
- private TestShardManager(Builder builder) {
+ TestShardManager(final Builder builder) {
super(builder);
shardActor = builder.shardActor;
shardActors = builder.shardActors;
}
@Override
- protected void handleRecover(Object message) throws Exception {
+ protected void handleRecover(final Object message) throws Exception {
try {
super.handleRecover(message);
} finally {
}
}
- private void countDownIfOther(final Member member, CountDownLatch latch) {
+ private void countDownIfOther(final Member member, final CountDownLatch latch) {
if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
latch.countDown();
}
}
@Override
- public void handleCommand(Object message) throws Exception {
+ public void handleCommand(final Object message) throws Exception {
try {
if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
getSender().tell(messageInterceptor.apply(message), getSelf());
}
}
- void setMessageInterceptor(MessageInterceptor messageInterceptor) {
+ void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
this.messageInterceptor = messageInterceptor;
}
findPrimaryMessageReceived = new CountDownLatch(1);
}
- public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
+ public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
return new Builder(datastoreContextBuilder);
}
private ActorRef shardActor;
private final Map<String, ActorRef> shardActors = new HashMap<>();
- Builder(DatastoreContext.Builder datastoreContextBuilder) {
+ Builder(final DatastoreContext.Builder datastoreContextBuilder) {
super(TestShardManager.class);
datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
}
- Builder shardActor(ActorRef newShardActor) {
+ Builder shardActor(final ActorRef newShardActor) {
this.shardActor = newShardActor;
return this;
}
- Builder addShardActor(String shardName, ActorRef actorRef) {
+ Builder addShardActor(final String shardName, final ActorRef actorRef) {
shardActors.put(shardName, actorRef);
return this;
}
}
@Override
- public void saveSnapshot(Object obj) {
+ public void saveSnapshot(final Object obj) {
snapshot = (ShardManagerSnapshot) obj;
snapshotPersist.countDown();
super.saveSnapshot(obj);
}
- void verifySnapshotPersisted(Set<String> shardList) {
+ void verifySnapshotPersisted(final Set<String> shardList) {
assertEquals("saveSnapshot invoked", true,
Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
}
@Override
- protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ protected ActorRef newShardActor(final ShardInformation info) {
if (shardActors.get(info.getShardName()) != null) {
return shardActors.get(info.getShardName());
}
return shardActor;
}
- return super.newShardActor(schemaContext, info);
+ return super.newShardActor(info);
}
}
extends AbstractShardManagerCreator<T> {
private final Class<C> shardManagerClass;
- AbstractGenericCreator(Class<C> shardManagerClass) {
+ AbstractGenericCreator(final Class<C> shardManagerClass) {
this.shardManagerClass = shardManagerClass;
cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
.primaryShardInfoCache(new PrimaryShardInfoFutureCache());
}
private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
- GenericCreator(Class<C> shardManagerClass) {
+ GenericCreator(final Class<C> shardManagerClass) {
super(shardManagerClass);
}
}
private static final long serialVersionUID = 1L;
private final Creator<ShardManager> delegate;
- DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+ DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
this.delegate = delegate;
}
private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
return new MessageInterceptor() {
@Override
- public Object apply(Object message) {
+ public Object apply(final Object message) {
return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
}
@Override
- public boolean canIntercept(Object message) {
+ public boolean canIntercept(final Object message) {
return message instanceof FindPrimary;
}
};
private final Class<?> requestClass;
@SuppressWarnings("unused")
- MockRespondActor(Class<?> requestClass, Object responseMsg) {
+ MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
this.requestClass = requestClass;
this.responseMsg = responseMsg;
}
@Override
- public void onReceive(Object message) throws Exception {
+ public void onReceive(final Object message) throws Exception {
if (message.equals(CLEAR_RESPONSE)) {
responseMsg = null;
} else {