import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private final DatastoreContext datastoreContext;
- private final String dataStoreType;
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private final Timeout transactionCommitOperationTimeout;
+ private final Dispatchers dispatchers;
private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this(actorSystem, shardManager, clusterWrapper, configuration,
- DatastoreContext.newBuilder().build(), UNKNOWN_DATA_STORE_TYPE);
+ DatastoreContext.newBuilder().build());
}
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration,
- DatastoreContext datastoreContext, String dataStoreType) {
+ DatastoreContext datastoreContext) {
this.actorSystem = actorSystem;
this.shardManager = shardManager;
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
- this.dataStoreType = dataStoreType;
this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+ this.dispatchers = new Dispatchers(actorSystem.dispatchers());
operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
jmxReporter.start();
+
}
public DatastoreContext getDatastoreContext() {
throw new UnknownMessageException(String.format(
"FindPrimary returned unkown response: %s", response));
}
- }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
}
/**
throw new UnknownMessageException(String.format(
"FindLocalShard returned unkown response: %s", response));
}
- }, getActorSystem().dispatcher());
+ }, getClientDispatcher());
}
private String findPrimaryPathOrNull(String shardName) {
* @return
*/
public Timer getOperationTimer(String operationName){
- final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName, METRIC_RATE);
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
return metricRegistry.timer(rate);
}
* @return
*/
public String getDataStoreType() {
- return dataStoreType;
+ return datastoreContext.getDataStoreType();
}
/**
return transactionCommitOperationTimeout;
}
+ /**
+ * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
+ * code on the datastore
+ * @return
+ */
+ public ExecutionContext getClientDispatcher() {
+ return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+ }
+
+ public String getNotificationDispatcherPath(){
+ return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
+ }
}