import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
SchemaContextHelper.full(), shardNames);
}
- public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig,
- boolean waitUntilLeader, String... shardNames) {
+ public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ final boolean waitUntilLeader, final String... shardNames) {
return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader,
SchemaContextHelper.full(), shardNames);
}
- public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig,
- boolean waitUntilLeader, SchemaContext schemaContext, String... shardNames) {
- ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
- Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
+ public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
+ final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
+ final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
datastoreContextBuilder.dataStoreName(typeName);
dataStore.onGlobalContextUpdated(schemaContext);
- if(waitUntilLeader) {
+ if (waitUntilLeader) {
waitUntilLeader(dataStore.getActorContext(), shardNames);
}
}
public void waitUntilLeader(ActorContext actorContext, String... shardNames) {
- for(String shardName: shardNames) {
+ for (String shardName: shardNames) {
ActorRef shard = findLocalShard(actorContext, shardName);
assertNotNull("Shard was not created for " + shardName, shard);
}
public void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
- for(String shardName: shardNames) {
+ for (String shardName: shardNames) {
ActorRef shard = findLocalShard(actorContext, shardName);
assertNotNull("No local shard found for " + shardName, shard);
public void waitForMembersUp(String... otherMembers) {
Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
CurrentClusterState state = Cluster.get(getSystem()).state();
- for(Member m: state.getMembers()) {
- if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
- otherMembersSet.isEmpty()) {
+ for (Member m: state.getMembers()) {
+ if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next())
+ && otherMembersSet.isEmpty()) {
return;
}
}
public static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
ActorRef shard = null;
- for(int i = 0; i < 20 * 5 && shard == null; i++) {
+ for (int i = 0; i < 20 * 5 && shard == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
- if(shardReply.isPresent()) {
+ if (shardReply.isPresent()) {
shard = shardReply.get();
}
}
AssertionError lastError = null;
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
- ShardStats shardStats = (ShardStats)actorContext.
- executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
+ while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ ShardStats shardStats = (ShardStats)actorContext
+ .executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
try {
verifier.verify(shardStats);
cohort.commit().get(5, TimeUnit.SECONDS);
}
- void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+ void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort)
+ throws Exception {
Boolean canCommit = canCommitFuture.get(7, TimeUnit.SECONDS);
assertEquals("canCommit", true, canCommit);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
throws Exception {
try {
callable.call();
fail("Expected " + expType.getSimpleName());
- } catch(Exception e) {
+ } catch (Exception e) {
assertEquals("Exception type", expType, e.getClass());
}
}
}
}, expType);
- assertExceptionOnCall(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- txChain.newReadWriteTransaction();
- return null;
- }
+ assertExceptionOnCall(() -> {
+ txChain.newReadWriteTransaction();
+ return null;
}, expType);
- assertExceptionOnCall(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- txChain.newReadOnlyTransaction();
- return null;
- }
+ assertExceptionOnCall(() -> {
+ txChain.newReadOnlyTransaction();
+ return null;
}, expType);
}