import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
- private final DatastoreContext datastoreContext;
- private final FiniteDuration operationDuration;
- private final Timeout operationTimeout;
+ private DatastoreContext datastoreContext;
+ private FiniteDuration operationDuration;
+ private Timeout operationTimeout;
private final String selfAddressHostPort;
- private final RateLimiter txRateLimiter;
+ private RateLimiter txRateLimiter;
private final MetricRegistry metricRegistry = new MetricRegistry();
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
- private final Timeout transactionCommitOperationTimeout;
+ private Timeout transactionCommitOperationTimeout;
+ private final Dispatchers dispatchers;
private volatile SchemaContext schemaContext;
+ private volatile boolean updated;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
- this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
-
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
- operationTimeout = new Timeout(operationDuration);
- transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
- TimeUnit.SECONDS));
+ this.dispatchers = new Dispatchers(actorSystem.dispatchers());
+ setCachedProperties();
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
jmxReporter.start();
+
+ }
+
+ private void setCachedProperties() {
+ txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+
+ transactionCommitOperationTimeout = new Timeout(Duration.create(
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
}
public DatastoreContext getDatastoreContext() {
this.schemaContext = schemaContext;
if(shardManager != null) {
- shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+ }
+ }
+
+ public void setDatastoreContext(DatastoreContext context) {
+ this.datastoreContext = context;
+ setCachedProperties();
+
+ // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
+ // will be published immediately even though they may not be immediately visible to other
+ // threads due to unsynchronized reads. That's OK though - we're going for eventual
+ // consistency here as immediately visible updates to these members aren't critical. These
+ // members could've been made volatile but wanted to avoid volatile reads as these are
+ // accessed often and updates will be infrequent.
+
+ updated = true;
+
+ if(shardManager != null) {
+ shardManager.tell(context, ActorRef.noSender());
}
}
throw new UnknownMessageException(String.format(
"FindPrimary returned unkown response: %s", response));
}
- }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
}
/**
throw new UnknownMessageException(String.format(
"FindLocalShard returned unkown response: %s", response));
}
- }, getActorSystem().dispatcher());
+ }, getClientDispatcher());
}
private String findPrimaryPathOrNull(String shardName) {
return transactionCommitOperationTimeout;
}
+ /**
+ * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
+ * code on the datastore
+ * @return
+ */
+ public ExecutionContext getClientDispatcher() {
+ return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+ }
+
+ public String getNotificationDispatcherPath(){
+ return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
+ }
}