import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import akka.actor.Address;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import java.lang.invoke.VarHandle;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
@Override
public Throwable apply(final Throwable failure) {
- Throwable actualFailure = failure;
if (failure instanceof AskTimeoutException) {
// A timeout exception most likely means the shard isn't initialized.
- actualFailure = new NotInitializedException(
+ return new NotInitializedException(
"Timed out trying to find the primary shard. Most likely cause is the "
+ "shard is not initialized yet.");
}
-
- return actualFailure;
+ return failure;
}
};
public static final String BOUNDED_MAILBOX = "bounded-mailbox";
private volatile EffectiveModelContext schemaContext;
- // Used as a write memory barrier.
- @SuppressWarnings("unused")
- private volatile boolean updated;
-
private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
.getMetricsRegistry();
setCachedProperties();
- Address selfAddress = clusterWrapper.getSelfAddress();
+ final var selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
} else {
TimeUnit.MILLISECONDS);
operationTimeout = new Timeout(operationDuration);
- transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
+ transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
datastoreContext = contextFactory.getBaseDatastoreContext();
setCachedProperties();
- // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
- // will be published immediately even though they may not be immediately visible to other
- // threads due to unsynchronized reads. That's OK though - we're going for eventual
- // consistency here as immediately visible updates to these members aren't critical. These
- // members could've been made volatile but wanted to avoid volatile reads as these are
- // accessed often and updates will be infrequent.
-
- updated = true;
+ // Trigger a write memory barrier so that the writes above will be published immediately even though they may
+ // not be immediately visible to other threads due to unsynchronized reads. That is OK though - we are going for
+ // eventual consistency here as immediately visible updates to these members are not critical. These members
+ // could have been made volatile but wanted to avoid volatile reads as these are accessed often and updates will
+ // be infrequent.
+ VarHandle.fullFence();
if (shardManager != null) {
shardManager.tell(contextFactory, ActorRef.noSender());
}
public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
- Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
+ final var ret = primaryShardInfoCache.getIfPresent(shardName);
if (ret != null) {
return ret;
}
- Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true), shardInitializationTimeout);
-
- return future.transform(new Mapper<Object, PrimaryShardInfo>() {
- @Override
- public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
- if (response instanceof RemotePrimaryShardFound) {
- LOG.debug("findPrimaryShardAsync received: {}", response);
- RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
- return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
- } else if (response instanceof LocalPrimaryShardFound) {
- LOG.debug("findPrimaryShardAsync received: {}", response);
- LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
- return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
+
+ return executeOperationAsync(shardManager, new FindPrimary(shardName, true), shardInitializationTimeout)
+ .transform(new Mapper<>() {
+ @Override
+ public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
+ if (response instanceof RemotePrimaryShardFound found) {
+ LOG.debug("findPrimaryShardAsync received: {}", found);
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
+ } else if (response instanceof LocalPrimaryShardFound found) {
+ LOG.debug("findPrimaryShardAsync received: {}", found);
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
found.getLocalShardDataTree());
- } else if (response instanceof NotInitializedException) {
- throw (NotInitializedException)response;
- } else if (response instanceof PrimaryNotFoundException) {
- throw (PrimaryNotFoundException)response;
- } else if (response instanceof NoShardLeaderException) {
- throw (NoShardLeaderException)response;
- }
+ } else if (response instanceof NotInitializedException notInitialized) {
+ throw notInitialized;
+ } else if (response instanceof PrimaryNotFoundException primaryNotFound) {
+ throw primaryNotFound;
+ } else if (response instanceof NoShardLeaderException noShardLeader) {
+ throw noShardLeader;
+ }
- throw new UnknownMessageException(String.format(
+ throw new UnknownMessageException(String.format(
"FindPrimary returned unkown response: %s", response));
- }
- }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
+ }
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
}
private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
- ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
- PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
+ final var actorSelection = actorSystem.actorSelection(primaryActorPath);
+ final var info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
primaryShardInfoCache.putSuccessful(shardName, info);
return info;
* specified by the shardName
*/
public Optional<ActorRef> findLocalShard(final String shardName) {
- Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
-
- if (result instanceof LocalShardFound) {
- LocalShardFound found = (LocalShardFound) result;
+ final var result = executeOperation(shardManager, new FindLocalShard(shardName, false));
+ if (result instanceof LocalShardFound found) {
LOG.debug("Local shard found {}", found.getPath());
return Optional.of(found.getPath());
}
* @param shardName the name of the local shard that needs to be found
*/
public Future<ActorRef> findLocalShardAsync(final String shardName) {
- Future<Object> future = executeOperationAsync(shardManager,
- new FindLocalShard(shardName, true), shardInitializationTimeout);
-
- return future.map(new Mapper<Object, ActorRef>() {
- @Override
- public ActorRef checkedApply(final Object response) throws Throwable {
- if (response instanceof LocalShardFound) {
- LocalShardFound found = (LocalShardFound)response;
- LOG.debug("Local shard found {}", found.getPath());
- return found.getPath();
- } else if (response instanceof NotInitializedException) {
- throw (NotInitializedException)response;
- } else if (response instanceof LocalShardNotFound) {
- throw new LocalShardNotFoundException(
+ return executeOperationAsync(shardManager, new FindLocalShard(shardName, true), shardInitializationTimeout)
+ .map(new Mapper<>() {
+ @Override
+ public ActorRef checkedApply(final Object response) throws Throwable {
+ if (response instanceof LocalShardFound found) {
+ LOG.debug("Local shard found {}", found.getPath());
+ return found.getPath();
+ } else if (response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if (response instanceof LocalShardNotFound) {
+ throw new LocalShardNotFoundException(
String.format("Local shard for %s does not exist.", shardName));
- }
+ }
- throw new UnknownMessageException(String.format(
- "FindLocalShard returned unkown response: %s", response));
- }
- }, getClientDispatcher());
+ throw new UnknownMessageException("FindLocalShard returned unkown response: " + response);
+ }
+ }, getClientDispatcher());
}
/**
@SuppressWarnings("checkstyle:IllegalCatch")
public void shutdown() {
- FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
+ final var duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
try {
Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
} catch (Exception e) {
public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
for (final String shardName : configuration.getAllShardNames()) {
- Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
- primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
+ final var primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
if (failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
shardName, failure);
} else {
- Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
+ final var message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
}
}
return false;
}
- String hostPort = path.substring(pathAtIndex + 1, slashIndex);
+ final var hostPort = path.substring(pathAtIndex + 1, slashIndex);
return hostPort.equals(selfAddressHostPort);
} else {
}
public Timer getOperationTimer(final String dataStoreType, final String operationName) {
- final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
- operationName, METRIC_RATE);
+ final var rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName,
+ METRIC_RATE);
return metricRegistry.timer(rate);
}
}
protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
- final Future<Object> ret = ask(actorRef, message, timeout);
+ final var ret = ask(actorRef, message, timeout);
ret.onComplete(askTimeoutCounter, askTimeoutCounter);
return ret;
}