private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
- public Throwable apply(Throwable failure) {
+ public Throwable apply(final Throwable failure) {
Throwable actualFailure = failure;
if (failure instanceof AskTimeoutException) {
// A timeout exception most likely means the shard isn't initialized.
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
private final ShardStrategyFactory shardStrategyFactory;
- public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
- ClusterWrapper clusterWrapper, Configuration configuration) {
+ public ActorContext(final ActorSystem actorSystem, final ActorRef shardManager,
+ final ClusterWrapper clusterWrapper, final Configuration configuration) {
this(actorSystem, shardManager, clusterWrapper, configuration,
DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
}
- public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
- ClusterWrapper clusterWrapper, Configuration configuration,
- DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ public ActorContext(final ActorSystem actorSystem, final ActorRef shardManager,
+ final ClusterWrapper clusterWrapper, final Configuration configuration,
+ final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.actorSystem = actorSystem;
this.shardManager = shardManager;
this.clusterWrapper = clusterWrapper;
return shardManager;
}
- public ActorSelection actorSelection(String actorPath) {
+ public ActorSelection actorSelection(final String actorPath) {
return actorSystem.actorSelection(actorPath);
}
- public ActorSelection actorSelection(ActorPath actorPath) {
+ public ActorSelection actorSelection(final ActorPath actorPath) {
return actorSystem.actorSelection(actorPath);
}
- public void setSchemaContext(SchemaContext schemaContext) {
+ public void setSchemaContext(final SchemaContext schemaContext) {
this.schemaContext = schemaContext;
if (shardManager != null) {
}
}
- public void setDatastoreContext(DatastoreContextFactory contextFactory) {
+ public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
this.datastoreContext = contextFactory.getBaseDatastoreContext();
setCachedProperties();
return future.transform(new Mapper<Object, PrimaryShardInfo>() {
@Override
- public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
+ public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
if (response instanceof RemotePrimaryShardFound) {
LOG.debug("findPrimaryShardAsync received: {}", response);
RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
}, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
}
- private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
- short primaryVersion, DataTree localShardDataTree) {
+ private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
+ final short primaryVersion, final DataTree localShardDataTree) {
ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
* @return a reference to a local shard actor which represents the shard
* specified by the shardName
*/
- public Optional<ActorRef> findLocalShard(String shardName) {
+ public Optional<ActorRef> findLocalShard(final String shardName) {
Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
if (result instanceof LocalShardFound) {
return future.map(new Mapper<Object, ActorRef>() {
@Override
- public ActorRef checkedApply(Object response) throws Throwable {
+ public ActorRef checkedApply(final Object response) throws Throwable {
if (response instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound)response;
LOG.debug("Local shard found {}", found.getPath());
* @return The response of the operation
*/
@SuppressWarnings("checkstyle:IllegalCatch")
- public Object executeOperation(ActorRef actor, Object message) {
+ public Object executeOperation(final ActorRef actor, final Object message) {
Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
try {
* @return the response message
*/
@SuppressWarnings("checkstyle:IllegalCatch")
- public Object executeOperation(ActorSelection actor, Object message) {
+ public Object executeOperation(final ActorSelection actor, final Object message) {
Future<Object> future = executeOperationAsync(actor, message);
try {
}
}
- public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
+ public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
Preconditions.checkArgument(actor != null, "actor must not be null");
Preconditions.checkArgument(message != null, "message must not be null");
* @param timeout the operation timeout
* @return a Future containing the eventual result
*/
- public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
- Timeout timeout) {
+ public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
+ final Timeout timeout) {
Preconditions.checkArgument(actor != null, "actor must not be null");
Preconditions.checkArgument(message != null, "message must not be null");
* @param message the message to send
* @return a Future containing the eventual result
*/
- public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
+ public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
return executeOperationAsync(actor, message, operationTimeout);
}
* @param actor the ActorSelection
* @param message the message to send
*/
- public void sendOperationAsync(ActorSelection actor, Object message) {
+ public void sendOperationAsync(final ActorSelection actor, final Object message) {
Preconditions.checkArgument(actor != null, "actor must not be null");
Preconditions.checkArgument(message != null, "message must not be null");
/**
* Send the message to each and every shard.
*/
- public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass) {
+ public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
for (final String shardName : configuration.getAllShardNames()) {
Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
+ public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
if (failure != null) {
- LOG.warn("broadcast failed to send message {} to shard {}: {}",
- messageClass.getSimpleName(), shardName, failure);
+ LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
+ shardName, failure);
} else {
Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
return operationTimeout;
}
- public boolean isPathLocal(String path) {
+ public boolean isPathLocal(final String path) {
if (Strings.isNullOrEmpty(path)) {
return false;
}
* @param operationName the name of the operation
* @return the Timer instance
*/
- public Timer getOperationTimer(String operationName) {
+ public Timer getOperationTimer(final String operationName) {
return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
}
- public Timer getOperationTimer(String dataStoreType, String operationName) {
+ public Timer getOperationTimer(final String dataStoreType, final String operationName) {
final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
operationName, METRIC_RATE);
return metricRegistry.timer(rate);
return shardStrategyFactory;
}
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
return ask(actorRef, message, timeout);
}
- protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout) {
+ protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
return ask(actorRef, message, timeout);
}