Expose AskTimeoutException counter from DatastoreInfoMXBean
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
index aaf3d33db078b78a7d944724844612bb04926265..fcfa9296aeb554e0da8057c8168c8d98ac34c06b 100644 (file)
@@ -26,6 +26,7 @@ import com.google.common.base.Strings;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
@@ -67,11 +68,40 @@ import scala.concurrent.duration.FiniteDuration;
  * not be passed to actors especially remote actors.
  */
 public class ActorUtils {
+    private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
+        private LongAdder ateExceptions = new LongAdder();
+
+        @Override
+        public void onComplete(final Throwable failure, final Object success) throws Throwable {
+            if (failure instanceof AskTimeoutException) {
+                ateExceptions.increment();
+            }
+        }
+
+        void reset() {
+            ateExceptions = new LongAdder();
+        }
+
+        long sum() {
+            return ateExceptions.sum();
+        }
+
+        @Override
+        public void execute(final Runnable runnable) {
+            // Yes, we are this ugly, but then we are just doing a check + an increment
+            runnable.run();
+        }
+
+        @Override
+        public void reportFailure(final Throwable cause) {
+            LOG.warn("Unexpected failure updating counters", cause);
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
     private static final String METRIC_RATE = "rate";
-    private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
-                                                              new Mapper<>() {
+    private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
         @Override
         public Throwable apply(final Throwable failure) {
             Throwable actualFailure = failure;
@@ -88,18 +118,20 @@ public class ActorUtils {
     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
     public static final String COMMIT = "commit";
 
+    private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
+    private final String selfAddressHostPort;
+    private final Dispatchers dispatchers;
+
     private DatastoreContext datastoreContext;
     private FiniteDuration operationDuration;
     private Timeout operationTimeout;
-    private final String selfAddressHostPort;
     private TransactionRateLimiter txRateLimiter;
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
-    private final Dispatchers dispatchers;
 
     private volatile EffectiveModelContext schemaContext;
 
@@ -142,7 +174,6 @@ public class ActorUtils {
         } else {
             selfAddressHostPort = null;
         }
-
     }
 
     private void setCachedProperties() {
@@ -503,6 +534,14 @@ public class ActorUtils {
         return txRateLimiter.getTxCreationLimit();
     }
 
+    public long getAskTimeoutExceptionCount() {
+        return askTimeoutCounter.sum();
+    }
+
+    public void resetAskTimeoutExceptionCount() {
+        askTimeoutCounter.reset();
+    }
+
     /**
      * Try to acquire a transaction creation permit. Will block if no permits are available.
      */
@@ -546,7 +585,9 @@ public class ActorUtils {
     }
 
     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
-        return ask(actorRef, message, timeout);
+        final Future<Object> ret = ask(actorRef, message, timeout);
+        ret.onComplete(askTimeoutCounter, askTimeoutCounter);
+        return ret;
     }
 
     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {