X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorUtils.java;h=fcfa9296aeb554e0da8057c8168c8d98ac34c06b;hb=27aa99100f4860edafa4d62d6fa2188f79348d98;hp=a1efbf2d9b8d42c5e0a5d4e8c6c2ad691f12d578;hpb=f662ce8b1fa94b77ba66f7ece8bcaff91dee809e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java index a1efbf2d9b..fcfa9296ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java @@ -21,10 +21,12 @@ import akka.pattern.Patterns; import akka.util.Timeout; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.Dispatchers; @@ -52,7 +54,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.reporting.MetricsReporter; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -66,11 +68,40 @@ import scala.concurrent.duration.FiniteDuration; * not be passed to actors especially remote actors. */ public class ActorUtils { + private static final class AskTimeoutCounter extends OnComplete implements ExecutionContext { + private LongAdder ateExceptions = new LongAdder(); + + @Override + public void onComplete(final Throwable failure, final Object success) throws Throwable { + if (failure instanceof AskTimeoutException) { + ateExceptions.increment(); + } + } + + void reset() { + ateExceptions = new LongAdder(); + } + + long sum() { + return ateExceptions.sum(); + } + + @Override + public void execute(final Runnable runnable) { + // Yes, we are this ugly, but then we are just doing a check + an increment + runnable.run(); + } + + @Override + public void reportFailure(final Throwable cause) { + LOG.warn("Unexpected failure updating counters", cause); + } + } + private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class); private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; private static final String METRIC_RATE = "rate"; - private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = - new Mapper() { + private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() { @Override public Throwable apply(final Throwable failure) { Throwable actualFailure = failure; @@ -87,20 +118,22 @@ public class ActorUtils { public static final String BOUNDED_MAILBOX = "bounded-mailbox"; public static final String COMMIT = "commit"; + private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter(); private final ActorSystem actorSystem; private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; + private final String selfAddressHostPort; + private final Dispatchers dispatchers; + private DatastoreContext datastoreContext; private FiniteDuration operationDuration; private Timeout operationTimeout; - private final String selfAddressHostPort; private TransactionRateLimiter txRateLimiter; private Timeout transactionCommitOperationTimeout; private Timeout shardInitializationTimeout; - private final Dispatchers dispatchers; - private volatile SchemaContext schemaContext; + private volatile EffectiveModelContext schemaContext; // Used as a write memory barrier. @SuppressWarnings("unused") @@ -141,7 +174,6 @@ public class ActorUtils { } else { selfAddressHostPort = null; } - } private void setCachedProperties() { @@ -177,7 +209,7 @@ public class ActorUtils { return actorSystem.actorSelection(actorPath); } - public void setSchemaContext(final SchemaContext schemaContext) { + public void setSchemaContext(final EffectiveModelContext schemaContext) { this.schemaContext = schemaContext; if (shardManager != null) { @@ -203,7 +235,7 @@ public class ActorUtils { } } - public SchemaContext getSchemaContext() { + public EffectiveModelContext getSchemaContext() { return schemaContext; } @@ -241,6 +273,8 @@ public class ActorUtils { }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath, final short primaryVersion, final ReadOnlyDataTree localShardDataTree) { ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath); @@ -266,7 +300,7 @@ public class ActorUtils { return Optional.of(found.getPath()); } - return Optional.absent(); + return Optional.empty(); } /** @@ -500,6 +534,14 @@ public class ActorUtils { return txRateLimiter.getTxCreationLimit(); } + public long getAskTimeoutExceptionCount() { + return askTimeoutCounter.sum(); + } + + public void resetAskTimeoutExceptionCount() { + askTimeoutCounter.reset(); + } + /** * Try to acquire a transaction creation permit. Will block if no permits are available. */ @@ -543,7 +585,9 @@ public class ActorUtils { } protected Future doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) { - return ask(actorRef, message, timeout); + final Future ret = ask(actorRef, message, timeout); + ret.onComplete(askTimeoutCounter, askTimeoutCounter); + return ret; } public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {