X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorUtils.java;h=1c7809e626d241df8d202f0953fe82eb4752357d;hb=99f80f27bee37bb23e345420bf14bb7bb4793c28;hp=b1deffefb7c0cd20030d995dff698b623943c389;hpb=14c92df74247c884a43c5aaea2f154992b0ec798;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java index b1deffefb7..1c7809e626 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorUtils.java @@ -23,9 +23,9 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.Dispatchers; @@ -51,9 +51,8 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.reporting.MetricsReporter; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -67,11 +66,40 @@ import scala.concurrent.duration.FiniteDuration; * not be passed to actors especially remote actors. */ public class ActorUtils { + private static final class AskTimeoutCounter extends OnComplete implements ExecutionContext { + private LongAdder ateExceptions = new LongAdder(); + + @Override + public void onComplete(final Throwable failure, final Object success) throws Throwable { + if (failure instanceof AskTimeoutException) { + ateExceptions.increment(); + } + } + + void reset() { + ateExceptions = new LongAdder(); + } + + long sum() { + return ateExceptions.sum(); + } + + @Override + public void execute(final Runnable runnable) { + // Yes, we are this ugly, but then we are just doing a check + an increment + runnable.run(); + } + + @Override + public void reportFailure(final Throwable cause) { + LOG.warn("Unexpected failure updating counters", cause); + } + } + private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class); private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; private static final String METRIC_RATE = "rate"; - private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = - new Mapper() { + private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() { @Override public Throwable apply(final Throwable failure) { Throwable actualFailure = failure; @@ -88,20 +116,22 @@ public class ActorUtils { public static final String BOUNDED_MAILBOX = "bounded-mailbox"; public static final String COMMIT = "commit"; + private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter(); private final ActorSystem actorSystem; private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; + private final String selfAddressHostPort; + private final Dispatchers dispatchers; + private DatastoreContext datastoreContext; private FiniteDuration operationDuration; private Timeout operationTimeout; - private final String selfAddressHostPort; private TransactionRateLimiter txRateLimiter; private Timeout transactionCommitOperationTimeout; private Timeout shardInitializationTimeout; - private final Dispatchers dispatchers; - private volatile SchemaContext schemaContext; + private volatile EffectiveModelContext schemaContext; // Used as a write memory barrier. @SuppressWarnings("unused") @@ -127,12 +157,9 @@ public class ActorUtils { this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; - this.dispatchers = new Dispatchers(actorSystem.dispatchers()); + dispatchers = new Dispatchers(actorSystem.dispatchers()); this.primaryShardInfoCache = primaryShardInfoCache; - - final LogicalDatastoreType convertedType = - LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name()); - this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType); + shardStrategyFactory = new ShardStrategyFactory(configuration); setCachedProperties(); @@ -142,7 +169,6 @@ public class ActorUtils { } else { selfAddressHostPort = null; } - } private void setCachedProperties() { @@ -178,7 +204,7 @@ public class ActorUtils { return actorSystem.actorSelection(actorPath); } - public void setSchemaContext(final SchemaContext schemaContext) { + public void setSchemaContext(final EffectiveModelContext schemaContext) { this.schemaContext = schemaContext; if (shardManager != null) { @@ -187,7 +213,7 @@ public class ActorUtils { } public void setDatastoreContext(final DatastoreContextFactory contextFactory) { - this.datastoreContext = contextFactory.getBaseDatastoreContext(); + datastoreContext = contextFactory.getBaseDatastoreContext(); setCachedProperties(); // We write the 'updated' volatile to trigger a write memory barrier so that the writes above @@ -204,7 +230,7 @@ public class ActorUtils { } } - public SchemaContext getSchemaContext() { + public EffectiveModelContext getSchemaContext() { return schemaContext; } @@ -242,8 +268,6 @@ public class ActorUtils { }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath, final short primaryVersion, final ReadOnlyDataTree localShardDataTree) { ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath); @@ -503,6 +527,14 @@ public class ActorUtils { return txRateLimiter.getTxCreationLimit(); } + public long getAskTimeoutExceptionCount() { + return askTimeoutCounter.sum(); + } + + public void resetAskTimeoutExceptionCount() { + askTimeoutCounter.reset(); + } + /** * Try to acquire a transaction creation permit. Will block if no permits are available. */ @@ -526,11 +558,11 @@ public class ActorUtils { * @return the dispatcher */ public ExecutionContext getClientDispatcher() { - return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client); + return dispatchers.getDispatcher(Dispatchers.DispatcherType.Client); } public String getNotificationDispatcherPath() { - return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); + return dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); } public Configuration getConfiguration() { @@ -546,7 +578,9 @@ public class ActorUtils { } protected Future doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) { - return ask(actorRef, message, timeout); + final Future ret = ask(actorRef, message, timeout); + ret.onComplete(askTimeoutCounter, askTimeoutCounter); + return ret; } public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {