Add UnsignedLongBitmap
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
index 3007e4b1872474c8c5eb095b09aa776bcfd651be..a9a646cc9f112e128e302bedc0457bcb814ce8bd 100644 (file)
@@ -23,8 +23,10 @@ import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
@@ -50,9 +52,8 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -66,11 +67,40 @@ import scala.concurrent.duration.FiniteDuration;
  * not be passed to actors especially remote actors.
  */
 public class ActorUtils {
+    private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
+        private LongAdder ateExceptions = new LongAdder();
+
+        @Override
+        public void onComplete(final Throwable failure, final Object success) throws Throwable {
+            if (failure instanceof AskTimeoutException) {
+                ateExceptions.increment();
+            }
+        }
+
+        void reset() {
+            ateExceptions = new LongAdder();
+        }
+
+        long sum() {
+            return ateExceptions.sum();
+        }
+
+        @Override
+        public void execute(final Runnable runnable) {
+            // Yes, we are this ugly, but then we are just doing a check + an increment
+            runnable.run();
+        }
+
+        @Override
+        public void reportFailure(final Throwable cause) {
+            LOG.warn("Unexpected failure updating counters", cause);
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
     private static final String METRIC_RATE = "rate";
-    private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
-                                                              new Mapper<Throwable, Throwable>() {
+    private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
         @Override
         public Throwable apply(final Throwable failure) {
             Throwable actualFailure = failure;
@@ -87,20 +117,22 @@ public class ActorUtils {
     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
     public static final String COMMIT = "commit";
 
+    private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
+    private final String selfAddressHostPort;
+    private final Dispatchers dispatchers;
+
     private DatastoreContext datastoreContext;
     private FiniteDuration operationDuration;
     private Timeout operationTimeout;
-    private final String selfAddressHostPort;
     private TransactionRateLimiter txRateLimiter;
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
-    private final Dispatchers dispatchers;
 
-    private volatile SchemaContext schemaContext;
+    private volatile EffectiveModelContext schemaContext;
 
     // Used as a write memory barrier.
     @SuppressWarnings("unused")
@@ -128,10 +160,7 @@ public class ActorUtils {
         this.datastoreContext = datastoreContext;
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
         this.primaryShardInfoCache = primaryShardInfoCache;
-
-        final LogicalDatastoreType convertedType =
-                LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
-        this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
+        this.shardStrategyFactory = new ShardStrategyFactory(configuration);
 
         setCachedProperties();
 
@@ -141,7 +170,6 @@ public class ActorUtils {
         } else {
             selfAddressHostPort = null;
         }
-
     }
 
     private void setCachedProperties() {
@@ -177,7 +205,7 @@ public class ActorUtils {
         return actorSystem.actorSelection(actorPath);
     }
 
-    public void setSchemaContext(final SchemaContext schemaContext) {
+    public void setSchemaContext(final EffectiveModelContext schemaContext) {
         this.schemaContext = schemaContext;
 
         if (shardManager != null) {
@@ -203,7 +231,7 @@ public class ActorUtils {
         }
     }
 
-    public SchemaContext getSchemaContext() {
+    public EffectiveModelContext getSchemaContext() {
         return schemaContext;
     }
 
@@ -241,6 +269,8 @@ public class ActorUtils {
         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
             final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
@@ -500,6 +530,14 @@ public class ActorUtils {
         return txRateLimiter.getTxCreationLimit();
     }
 
+    public long getAskTimeoutExceptionCount() {
+        return askTimeoutCounter.sum();
+    }
+
+    public void resetAskTimeoutExceptionCount() {
+        askTimeoutCounter.reset();
+    }
+
     /**
      * Try to acquire a transaction creation permit. Will block if no permits are available.
      */
@@ -543,7 +581,9 @@ public class ActorUtils {
     }
 
     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
-        return ask(actorRef, message, timeout);
+        final Future<Object> ret = ask(actorRef, message, timeout);
+        ret.onComplete(askTimeoutCounter, askTimeoutCounter);
+        return ret;
     }
 
     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {