import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
* not be passed to actors especially remote actors.
*/
public class ActorUtils {
+ private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
+ private LongAdder ateExceptions = new LongAdder();
+
+ @Override
+ public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ if (failure instanceof AskTimeoutException) {
+ ateExceptions.increment();
+ }
+ }
+
+ void reset() {
+ ateExceptions = new LongAdder();
+ }
+
+ long sum() {
+ return ateExceptions.sum();
+ }
+
+ @Override
+ public void execute(final Runnable runnable) {
+ // Yes, we are this ugly, but then we are just doing a check + an increment
+ runnable.run();
+ }
+
+ @Override
+ public void reportFailure(final Throwable cause) {
+ LOG.warn("Unexpected failure updating counters", cause);
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
private static final String METRIC_RATE = "rate";
- private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
- new Mapper<>() {
+ private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
@Override
public Throwable apply(final Throwable failure) {
Throwable actualFailure = failure;
public static final String BOUNDED_MAILBOX = "bounded-mailbox";
public static final String COMMIT = "commit";
+ private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
+ private final String selfAddressHostPort;
+ private final Dispatchers dispatchers;
+
private DatastoreContext datastoreContext;
private FiniteDuration operationDuration;
private Timeout operationTimeout;
- private final String selfAddressHostPort;
private TransactionRateLimiter txRateLimiter;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
- private final Dispatchers dispatchers;
private volatile EffectiveModelContext schemaContext;
} else {
selfAddressHostPort = null;
}
-
}
private void setCachedProperties() {
return txRateLimiter.getTxCreationLimit();
}
+ public long getAskTimeoutExceptionCount() {
+ return askTimeoutCounter.sum();
+ }
+
+ public void resetAskTimeoutExceptionCount() {
+ askTimeoutCounter.reset();
+ }
+
/**
* Try to acquire a transaction creation permit. Will block if no permits are available.
*/
}
protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
- return ask(actorRef, message, timeout);
+ final Future<Object> ret = ask(actorRef, message, timeout);
+ ret.onComplete(askTimeoutCounter, askTimeoutCounter);
+ return ret;
}
public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {