import akka.util.Timeout;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
* not be passed to actors especially remote actors.
*/
public class ActorUtils {
+ private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
+ private LongAdder ateExceptions = new LongAdder();
+
+ @Override
+ public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ if (failure instanceof AskTimeoutException) {
+ ateExceptions.increment();
+ }
+ }
+
+ void reset() {
+ ateExceptions = new LongAdder();
+ }
+
+ long sum() {
+ return ateExceptions.sum();
+ }
+
+ @Override
+ public void execute(final Runnable runnable) {
+ // Yes, we are this ugly, but then we are just doing a check + an increment
+ runnable.run();
+ }
+
+ @Override
+ public void reportFailure(final Throwable cause) {
+ LOG.warn("Unexpected failure updating counters", cause);
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
private static final String METRIC_RATE = "rate";
- private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
- new Mapper<Throwable, Throwable>() {
+ private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
@Override
public Throwable apply(final Throwable failure) {
Throwable actualFailure = failure;
public static final String BOUNDED_MAILBOX = "bounded-mailbox";
public static final String COMMIT = "commit";
+ private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
+ private final String selfAddressHostPort;
+ private final Dispatchers dispatchers;
+
private DatastoreContext datastoreContext;
private FiniteDuration operationDuration;
private Timeout operationTimeout;
- private final String selfAddressHostPort;
private TransactionRateLimiter txRateLimiter;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
- private final Dispatchers dispatchers;
- private volatile SchemaContext schemaContext;
+ private volatile EffectiveModelContext schemaContext;
// Used as a write memory barrier.
@SuppressWarnings("unused")
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
- this.dispatchers = new Dispatchers(actorSystem.dispatchers());
+ dispatchers = new Dispatchers(actorSystem.dispatchers());
this.primaryShardInfoCache = primaryShardInfoCache;
-
- final LogicalDatastoreType convertedType =
- LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
- this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
+ shardStrategyFactory = new ShardStrategyFactory(configuration);
setCachedProperties();
} else {
selfAddressHostPort = null;
}
-
}
private void setCachedProperties() {
return actorSystem.actorSelection(actorPath);
}
- public void setSchemaContext(final SchemaContext schemaContext) {
+ public void setSchemaContext(final EffectiveModelContext schemaContext) {
this.schemaContext = schemaContext;
if (shardManager != null) {
}
public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
- this.datastoreContext = contextFactory.getBaseDatastoreContext();
+ datastoreContext = contextFactory.getBaseDatastoreContext();
setCachedProperties();
// We write the 'updated' volatile to trigger a write memory barrier so that the writes above
}
}
- public SchemaContext getSchemaContext() {
+ public EffectiveModelContext getSchemaContext() {
return schemaContext;
}
}
private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
- final short primaryVersion, final DataTree localShardDataTree) {
+ final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
return Optional.of(found.getPath());
}
- return Optional.absent();
+ return Optional.empty();
}
/**
return txRateLimiter.getTxCreationLimit();
}
+ public long getAskTimeoutExceptionCount() {
+ return askTimeoutCounter.sum();
+ }
+
+ public void resetAskTimeoutExceptionCount() {
+ askTimeoutCounter.reset();
+ }
+
/**
* Try to acquire a transaction creation permit. Will block if no permits are available.
*/
* @return the dispatcher
*/
public ExecutionContext getClientDispatcher() {
- return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+ return dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
}
public String getNotificationDispatcherPath() {
- return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
+ return dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
}
public Configuration getConfiguration() {
}
protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
- return ask(actorRef, message, timeout);
+ final Future<Object> ret = ask(actorRef, message, timeout);
+ ret.onComplete(askTimeoutCounter, askTimeoutCounter);
+ return ret;
}
public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {