Merge "Bug 2358: Fixed warnings in Restconf"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index 6f9bb7fc9feb4ede3a00d06b07d689d615f51458..17d988005fadf32c8209837671d43d0aa2981b60 100644 (file)
@@ -20,7 +20,6 @@ import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
-import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
@@ -41,14 +40,13 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,10 +64,8 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class ActorContext {
     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
-    private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
     private static final String METRIC_RATE = "rate";
-    private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -96,8 +92,6 @@ public class ActorContext {
     private Timeout operationTimeout;
     private final String selfAddressHostPort;
     private RateLimiter txRateLimiter;
-    private final MetricRegistry metricRegistry = new MetricRegistry();
-    private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
@@ -106,6 +100,7 @@ public class ActorContext {
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
+    private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -133,8 +128,6 @@ public class ActorContext {
         }
 
         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
-        jmxReporter.start();
-
     }
 
     private void setCachedProperties() {
@@ -209,25 +202,22 @@ public class ActorContext {
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
-                new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
+                new FindPrimary(shardName, true), shardInitializationTimeout);
 
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
             public ActorSelection checkedApply(Object response) throws Exception {
-                if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
-                    PrimaryFound found = PrimaryFound.fromSerializable(response);
+                if(response instanceof PrimaryFound) {
+                    PrimaryFound found = (PrimaryFound)response;
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
                     primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
                     return actorSelection;
-                } else if(response instanceof ActorNotInitialized) {
-                    throw new NotInitializedException(
-                            String.format("Found primary shard %s but it's not initialized yet. " +
-                                          "Please try again later", shardName));
-                } else if(response instanceof PrimaryNotFound) {
-                    throw new PrimaryNotFoundException(
-                            String.format("No primary shard found for %S.", shardName));
+                } else if(response instanceof NotInitializedException) {
+                    throw (NotInitializedException)response;
+                } else if(response instanceof PrimaryNotFoundException) {
+                    throw (PrimaryNotFoundException)response;
                 } else if(response instanceof NoShardLeaderException) {
                     throw (NoShardLeaderException)response;
                 }
@@ -274,10 +264,8 @@ public class ActorContext {
                     LocalShardFound found = (LocalShardFound)response;
                     LOG.debug("Local shard found {}", found.getPath());
                     return found.getPath();
-                } else if(response instanceof ActorNotInitialized) {
-                    throw new NotInitializedException(
-                            String.format("Found local shard for %s but it's not initialized yet.",
-                                    shardName));
+                } else if(response instanceof NotInitializedException) {
+                    throw (NotInitializedException)response;
                 } else if(response instanceof LocalShardNotFound) {
                     throw new LocalShardNotFoundException(
                             String.format("Local shard for %s does not exist.", shardName));
@@ -489,7 +477,12 @@ public class ActorContext {
      * @return
      */
     public Timer getOperationTimer(String operationName){
-        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+        return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
+    }
+
+    public Timer getOperationTimer(String dataStoreType, String operationName){
+        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
+                operationName, METRIC_RATE);
         return metricRegistry.timer(rate);
     }
 
@@ -547,6 +540,10 @@ public class ActorContext {
         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
     }
 
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
         return ask(actorRef, message, timeout);
     }