import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
-import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
public class ActorContext {
private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
- private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
private static final String METRIC_RATE = "rate";
- private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
private Timeout operationTimeout;
private final String selfAddressHostPort;
private RateLimiter txRateLimiter;
- private final MetricRegistry metricRegistry = new MetricRegistry();
- private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
+ private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
}
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
- jmxReporter.start();
-
}
private void setCachedProperties() {
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
+ new FindPrimary(shardName, true), shardInitializationTimeout);
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
public ActorSelection checkedApply(Object response) throws Exception {
- if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
- PrimaryFound found = PrimaryFound.fromSerializable(response);
+ if(response instanceof PrimaryFound) {
+ PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
return actorSelection;
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found primary shard %s but it's not initialized yet. " +
- "Please try again later", shardName));
- } else if(response instanceof PrimaryNotFound) {
- throw new PrimaryNotFoundException(
- String.format("No primary shard found for %S.", shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
} else if(response instanceof NoShardLeaderException) {
throw (NoShardLeaderException)response;
}
LocalShardFound found = (LocalShardFound)response;
LOG.debug("Local shard found {}", found.getPath());
return found.getPath();
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found local shard for %s but it's not initialized yet.",
- shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
} else if(response instanceof LocalShardNotFound) {
throw new LocalShardNotFoundException(
String.format("Local shard for %s does not exist.", shardName));
* @return
*/
public Timer getOperationTimer(String operationName){
- final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+ return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
+ }
+
+ public Timer getOperationTimer(String dataStoreType, String operationName){
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
+ operationName, METRIC_RATE);
return metricRegistry.timer(rate);
}