X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=2a47cae8b312a560ae0fe4ecc81d1216c91165c0;hb=refs%2Fchanges%2F63%2F59463%2F3;hp=5b4f54daf8ea9b56f43e1afa1eb3f38dc9b5c1fd;hpb=3b0499cd187bcdeda057465350d381c8bc28847c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 5b4f54daf8..2a47cae8b3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -9,35 +9,31 @@ package org.opendaylight.controller.cluster.datastore.utils; import static akka.pattern.Patterns.ask; + import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; -import akka.actor.PoisonPill; -import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.util.Timeout; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import java.util.ArrayList; -import java.util.Collection; import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.common.actor.CommonConfig; +import java.util.function.Function; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -52,7 +48,10 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.reporting.MetricsReporter; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -69,7 +68,7 @@ import scala.concurrent.duration.FiniteDuration; * easily. An ActorContext can be freely passed around to local object instances * but should not be passed to actors especially remote actors */ -public class ActorContext implements RemovalListener> { +public class ActorContext { private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; private static final String METRIC_RATE = "rate"; @@ -78,17 +77,17 @@ public class ActorContext implements RemovalListener> primaryShardInfoCache; private volatile SchemaContext schemaContext; + + // Used as a write memory barrier. + @SuppressWarnings("unused") private volatile boolean updated; - private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry(); - @GuardedBy("shardInfoListeners") - private final Collection> shardInfoListeners = new ArrayList<>(); + + private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN) + .getMetricsRegistry(); + + private final PrimaryShardInfoFutureCache primaryShardInfoCache; + private final ShardStrategyFactory shardStrategyFactory; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { this(actorSystem, shardManager, clusterWrapper, configuration, - DatastoreContext.newBuilder().build()); + DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); } public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) { this.actorSystem = actorSystem; this.shardManager = shardManager; this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; this.dispatchers = new Dispatchers(actorSystem.dispatchers()); + this.primaryShardInfoCache = primaryShardInfoCache; + + final LogicalDatastoreType convertedType = + LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name()); + this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType); setCachedProperties(); @@ -137,24 +145,18 @@ public class ActorContext implements RemovalListener findPrimaryShardAsync(final String shardName) { Future ret = primaryShardInfoCache.getIfPresent(shardName); - if(ret != null){ + if (ret != null) { return ret; } Future future = executeOperationAsync(shardManager, @@ -217,19 +219,21 @@ public class ActorContext implements RemovalListener() { @Override - public PrimaryShardInfo checkedApply(Object response) throws Exception { - if(response instanceof RemotePrimaryShardFound) { + public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException { + if (response instanceof RemotePrimaryShardFound) { LOG.debug("findPrimaryShardAsync received: {}", response); - return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null); - } else if(response instanceof LocalPrimaryShardFound) { + RemotePrimaryShardFound found = (RemotePrimaryShardFound)response; + return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null); + } else if (response instanceof LocalPrimaryShardFound) { LOG.debug("findPrimaryShardAsync received: {}", response); LocalPrimaryShardFound found = (LocalPrimaryShardFound)response; - return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree()); - } else if(response instanceof NotInitializedException) { + return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION, + found.getLocalShardDataTree()); + } else if (response instanceof NotInitializedException) { throw (NotInitializedException)response; - } else if(response instanceof PrimaryNotFoundException) { + } else if (response instanceof PrimaryNotFoundException) { throw (PrimaryNotFoundException)response; - } else if(response instanceof NoShardLeaderException) { + } else if (response instanceof NoShardLeaderException) { throw (NoShardLeaderException)response; } @@ -240,21 +244,16 @@ public class ActorContext implements RemovalListener reg : shardInfoListeners) { - reg.getInstance().onShardInfoUpdated(shardName, info); - } - } + PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) : + new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree); + primaryShardInfoCache.putSuccessful(shardName, info); return info; } /** - * Finds a local shard given its shard name and return it's ActorRef + * Finds a local shard given its shard name and return it's ActorRef. * * @param shardName the name of the local shard that needs to be found * @return a reference to a local shard actor which represents the shard @@ -278,20 +277,20 @@ public class ActorContext implements RemovalListener findLocalShardAsync( final String shardName) { + public Future findLocalShardAsync(final String shardName) { Future future = executeOperationAsync(shardManager, new FindLocalShard(shardName, true), shardInitializationTimeout); return future.map(new Mapper() { @Override public ActorRef checkedApply(Object response) throws Throwable { - if(response instanceof LocalShardFound) { + if (response instanceof LocalShardFound) { LocalShardFound found = (LocalShardFound)response; LOG.debug("Local shard found {}", found.getPath()); return found.getPath(); - } else if(response instanceof NotInitializedException) { + } else if (response instanceof NotInitializedException) { throw (NotInitializedException)response; - } else if(response instanceof LocalShardNotFound) { + } else if (response instanceof LocalShardNotFound) { throw new LocalShardNotFoundException( String.format("Local shard for %s does not exist.", shardName)); } @@ -303,49 +302,51 @@ public class ActorContext implements RemovalListener future = executeOperationAsync(actor, message, operationTimeout); try { return Await.result(future, operationDuration); } catch (Exception e) { - throw new TimeoutException("Sending message " + message.getClass().toString() + - " to actor " + actor.toString() + " failed. Try again later.", e); + throw new TimeoutException("Sending message " + message.getClass().toString() + + " to actor " + actor.toString() + " failed. Try again later.", e); } } - public Future executeOperationAsync(ActorRef actor, Object message, Timeout timeout) { - Preconditions.checkArgument(actor != null, "actor must not be null"); - Preconditions.checkArgument(message != null, "message must not be null"); - - LOG.debug("Sending message {} to {}", message.getClass(), actor); - return doAsk(actor, message, timeout); - } - /** - * Execute an operation on a remote actor and wait for it's response + * Execute an operation on a remote actor and wait for it's response. * - * @param actor - * @param message - * @return + * @param actor the actor + * @param message the message + * @return the response message */ + @SuppressWarnings("checkstyle:IllegalCatch") public Object executeOperation(ActorSelection actor, Object message) { Future future = executeOperationAsync(actor, message); try { return Await.result(future, operationDuration); } catch (Exception e) { - throw new TimeoutException("Sending message " + message.getClass().toString() + - " to actor " + actor.toString() + " failed. Try again later.", e); + throw new TimeoutException("Sending message " + message.getClass().toString() + + " to actor " + actor.toString() + " failed. Try again later.", e); } } + public Future executeOperationAsync(ActorRef actor, Object message, Timeout timeout) { + Preconditions.checkArgument(actor != null, "actor must not be null"); + Preconditions.checkArgument(message != null, "message must not be null"); + + LOG.debug("Sending message {} to {}", message.getClass(), actor); + return doAsk(actor, message, timeout); + } + /** * Execute an operation on a remote actor asynchronously. * @@ -391,34 +392,39 @@ public class ActorContext implements RemovalListener messageSupplier, Class messageClass) { + for (final String shardName : configuration.getAllShardNames()) { Future primaryFuture = findPrimaryShardAsync(shardName); primaryFuture.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { - if(failure != null) { + if (failure != null) { LOG.warn("broadcast failed to send message {} to shard {}: {}", - message.getClass().getSimpleName(), shardName, failure); + messageClass.getSimpleName(), shardName, failure); } else { + Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion()); primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender()); } } @@ -430,6 +436,10 @@ public class ActorContext implements RemovalListener doAsk(ActorRef actorRef, Object message, Timeout timeout){ - return ask(actorRef, message, timeout); + public ShardStrategyFactory getShardStrategyFactory() { + return shardStrategyFactory; } - protected Future doAsk(ActorSelection actorRef, Object message, Timeout timeout){ + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return ask(actorRef, message, timeout); } - @VisibleForTesting - Cache> getPrimaryShardInfoCache() { - return primaryShardInfoCache; - } - - public ShardInfoListenerRegistration registerShardInfoListener(final T listener) { - final ShardInfoListenerRegistration reg = new ShardInfoListenerRegistration(listener, this); - - synchronized (shardInfoListeners) { - shardInfoListeners.add(reg); - } - return reg; - } - - protected void removeShardInfoListener(final ShardInfoListenerRegistration registration) { - synchronized (shardInfoListeners) { - shardInfoListeners.remove(registration); - } + protected Future doAsk(ActorSelection actorRef, Object message, Timeout timeout) { + return ask(actorRef, message, timeout); } - @Override - public void onRemoval(final RemovalNotification> notification) { - synchronized (shardInfoListeners) { - for (ShardInfoListenerRegistration reg : shardInfoListeners) { - reg.getInstance().onShardInfoUpdated(notification.getKey(), null); - } - } + public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() { + return primaryShardInfoCache; } }