Merge "Create transaction on the backend datastore only when neccessary"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index b6250fc1cccbfbde152a8af7e1e2f4b9b3c6f7cd..afa773b4615e8905fffea10d42dbabd1cb046b0e 100644 (file)
@@ -20,7 +20,6 @@ import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
-import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
@@ -46,7 +45,10 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,10 +66,8 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class ActorContext {
     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
-    private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
     private static final String METRIC_RATE = "rate";
-    private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -94,16 +94,15 @@ public class ActorContext {
     private Timeout operationTimeout;
     private final String selfAddressHostPort;
     private RateLimiter txRateLimiter;
-    private final MetricRegistry metricRegistry = new MetricRegistry();
-    private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
-    private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+    private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
+    private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -131,8 +130,6 @@ public class ActorContext {
         }
 
         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
-        jmxReporter.start();
-
     }
 
     private void setCachedProperties() {
@@ -146,7 +143,7 @@ public class ActorContext {
 
         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
 
-        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+        primaryShardInfoCache = CacheBuilder.newBuilder()
                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
                 .build();
     }
@@ -201,24 +198,25 @@ public class ActorContext {
         return schemaContext;
     }
 
-    public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
-        Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+    public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
+        Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
         if(ret != null){
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
                 new FindPrimary(shardName, true), shardInitializationTimeout);
 
-        return future.transform(new Mapper<Object, ActorSelection>() {
+        return future.transform(new Mapper<Object, PrimaryShardInfo>() {
             @Override
-            public ActorSelection checkedApply(Object response) throws Exception {
+            public PrimaryShardInfo checkedApply(Object response) throws Exception {
                 if(response instanceof PrimaryFound) {
                     PrimaryFound found = (PrimaryFound)response;
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
-                    primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
-                    return actorSelection;
+                    PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
+                    primaryShardInfoCache.put(shardName, Futures.successful(info));
+                    return info;
                 } else if(response instanceof NotInitializedException) {
                     throw (NotInitializedException)response;
                 } else if(response instanceof PrimaryNotFoundException) {
@@ -392,15 +390,15 @@ public class ActorContext {
     public void broadcast(final Object message){
         for(final String shardName : configuration.getAllShardNames()){
 
-            Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
-            primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+            Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
+            primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
-                public void onComplete(Throwable failure, ActorSelection primaryShard) {
+                public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
                     if(failure != null) {
                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
                                 message.getClass().getSimpleName(), shardName, failure);
                     } else {
-                        primaryShard.tell(message, ActorRef.noSender());
+                        primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
                     }
                 }
             }, getClientDispatcher());
@@ -482,7 +480,12 @@ public class ActorContext {
      * @return
      */
     public Timer getOperationTimer(String operationName){
-        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+        return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
+    }
+
+    public Timer getOperationTimer(String dataStoreType, String operationName){
+        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
+                operationName, METRIC_RATE);
         return metricRegistry.timer(rate);
     }
 
@@ -540,6 +543,10 @@ public class ActorContext {
         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
     }
 
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
         return ask(actorRef, message, timeout);
     }
@@ -549,7 +556,7 @@ public class ActorContext {
     }
 
     @VisibleForTesting
-    Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
-        return primaryShardActorSelectionCache;
+    Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+        return primaryShardInfoCache;
     }
 }