Fix warnings/javadocs in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index f53368d8869416cb28340ab272aedcbfab449512..f1a1f145b370720c80e0a9fd1cb619b44cc4c52a 100644 (file)
@@ -9,31 +9,30 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import static akka.pattern.Patterns.ask;
+
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
-import akka.actor.PoisonPill;
-import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.util.concurrent.RateLimiter;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import java.util.function.Function;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -42,11 +41,16 @@ import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,17 +75,18 @@ public class ActorContext {
         @Override
         public Throwable apply(Throwable failure) {
             Throwable actualFailure = failure;
-            if(failure instanceof AskTimeoutException) {
+            if (failure instanceof AskTimeoutException) {
                 // A timeout exception most likely means the shard isn't initialized.
                 actualFailure = new NotInitializedException(
-                        "Timed out trying to find the primary shard. Most likely cause is the " +
-                        "shard is not initialized yet.");
+                        "Timed out trying to find the primary shard. Most likely cause is the "
+                        "shard is not initialized yet.");
             }
 
             return actualFailure;
         }
     };
-    public static final String MAILBOX = "bounded-mailbox";
+    public static final String BOUNDED_MAILBOX = "bounded-mailbox";
+    public static final String COMMIT = "commit";
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
@@ -91,32 +96,40 @@ public class ActorContext {
     private FiniteDuration operationDuration;
     private Timeout operationTimeout;
     private final String selfAddressHostPort;
-    private RateLimiter txRateLimiter;
-    private final int transactionOutstandingOperationLimit;
+    private TransactionRateLimiter txRateLimiter;
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
-    private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
 
     private volatile SchemaContext schemaContext;
+
+    // Used as a write memory barrier.
+    @SuppressWarnings("unused")
     private volatile boolean updated;
-    private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
+
+    private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
+            .getMetricsRegistry();
+
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+    private final ShardStrategyFactory shardStrategyFactory;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
         this(actorSystem, shardManager, clusterWrapper, configuration,
-                DatastoreContext.newBuilder().build());
+                DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
     }
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration,
-            DatastoreContext datastoreContext) {
+            DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
         this.actorSystem = actorSystem;
         this.shardManager = shardManager;
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
+        this.primaryShardInfoCache = primaryShardInfoCache;
+        this.shardStrategyFactory = new ShardStrategyFactory(configuration);
 
         setCachedProperties();
 
@@ -127,23 +140,18 @@ public class ActorContext {
             selfAddressHostPort = null;
         }
 
-        transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
     }
 
     private void setCachedProperties() {
-        txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+        txRateLimiter = new TransactionRateLimiter(this);
 
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
         operationTimeout = new Timeout(operationDuration);
 
         transactionCommitOperationTimeout =  new Timeout(Duration.create(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
 
         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
-
-        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
-                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
-                .build();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -169,13 +177,13 @@ public class ActorContext {
     public void setSchemaContext(SchemaContext schemaContext) {
         this.schemaContext = schemaContext;
 
-        if(shardManager != null) {
+        if (shardManager != null) {
             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
         }
     }
 
-    public void setDatastoreContext(DatastoreContext context) {
-        this.datastoreContext = context;
+    public void setDatastoreContext(DatastoreContextFactory contextFactory) {
+        this.datastoreContext = contextFactory.getBaseDatastoreContext();
         setCachedProperties();
 
         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
@@ -187,8 +195,8 @@ public class ActorContext {
 
         updated = true;
 
-        if(shardManager != null) {
-            shardManager.tell(context, ActorRef.noSender());
+        if (shardManager != null) {
+            shardManager.tell(contextFactory, ActorRef.noSender());
         }
     }
 
@@ -196,29 +204,31 @@ public class ActorContext {
         return schemaContext;
     }
 
-    public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
-        Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
-        if(ret != null){
+    public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
+        Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
+        if (ret != null) {
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
                 new FindPrimary(shardName, true), shardInitializationTimeout);
 
-        return future.transform(new Mapper<Object, ActorSelection>() {
+        return future.transform(new Mapper<Object, PrimaryShardInfo>() {
             @Override
-            public ActorSelection checkedApply(Object response) throws Exception {
-                if(response instanceof PrimaryFound) {
-                    PrimaryFound found = (PrimaryFound)response;
-
-                    LOG.debug("Primary found {}", found.getPrimaryPath());
-                    ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
-                    primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
-                    return actorSelection;
-                } else if(response instanceof NotInitializedException) {
+            public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
+                if (response instanceof RemotePrimaryShardFound) {
+                    LOG.debug("findPrimaryShardAsync received: {}", response);
+                    RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
+                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
+                } else if (response instanceof LocalPrimaryShardFound) {
+                    LOG.debug("findPrimaryShardAsync received: {}", response);
+                    LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
+                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
+                            found.getLocalShardDataTree());
+                } else if (response instanceof NotInitializedException) {
                     throw (NotInitializedException)response;
-                } else if(response instanceof PrimaryNotFoundException) {
+                } else if (response instanceof PrimaryNotFoundException) {
                     throw (PrimaryNotFoundException)response;
-                } else if(response instanceof NoShardLeaderException) {
+                } else if (response instanceof NoShardLeaderException) {
                     throw (NoShardLeaderException)response;
                 }
 
@@ -228,8 +238,17 @@ public class ActorContext {
         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
+    private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
+            short primaryVersion, DataTree localShardDataTree) {
+        ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
+        PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
+            new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
+        primaryShardInfoCache.putSuccessful(shardName, info);
+        return info;
+    }
+
     /**
-     * Finds a local shard given its shard name and return it's ActorRef
+     * Finds a local shard given its shard name and return it's ActorRef.
      *
      * @param shardName the name of the local shard that needs to be found
      * @return a reference to a local shard actor which represents the shard
@@ -260,13 +279,13 @@ public class ActorContext {
         return future.map(new Mapper<Object, ActorRef>() {
             @Override
             public ActorRef checkedApply(Object response) throws Throwable {
-                if(response instanceof LocalShardFound) {
+                if (response instanceof LocalShardFound) {
                     LocalShardFound found = (LocalShardFound)response;
                     LOG.debug("Local shard found {}", found.getPath());
                     return found.getPath();
-                } else if(response instanceof NotInitializedException) {
+                } else if (response instanceof NotInitializedException) {
                     throw (NotInitializedException)response;
-                } else if(response instanceof LocalShardNotFound) {
+                } else if (response instanceof LocalShardNotFound) {
                     throw new LocalShardNotFoundException(
                             String.format("Local shard for %s does not exist.", shardName));
                 }
@@ -278,49 +297,51 @@ public class ActorContext {
     }
 
     /**
-     * Executes an operation on a local actor and wait for it's response
+     * Executes an operation on a local actor and wait for it's response.
      *
-     * @param actor
-     * @param message
+     * @param actor the actor
+     * @param message the message to send
      * @return The response of the operation
      */
+    @SuppressWarnings("checkstyle:IllegalCatch")
     public Object executeOperation(ActorRef actor, Object message) {
         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
 
         try {
             return Await.result(future, operationDuration);
         } catch (Exception e) {
-            throw new TimeoutException("Sending message " + message.getClass().toString() +
-                    " to actor " + actor.toString() + " failed. Try again later.", e);
+            throw new TimeoutException("Sending message " + message.getClass().toString()
+                    " to actor " + actor.toString() + " failed. Try again later.", e);
         }
     }
 
-    public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
-        Preconditions.checkArgument(actor != null, "actor must not be null");
-        Preconditions.checkArgument(message != null, "message must not be null");
-
-        LOG.debug("Sending message {} to {}", message.getClass(), actor);
-        return doAsk(actor, message, timeout);
-    }
-
     /**
-     * Execute an operation on a remote actor and wait for it's response
+     * Execute an operation on a remote actor and wait for it's response.
      *
-     * @param actor
-     * @param message
-     * @return
+     * @param actor the actor
+     * @param message the message
+     * @return the response message
      */
+    @SuppressWarnings("checkstyle:IllegalCatch")
     public Object executeOperation(ActorSelection actor, Object message) {
         Future<Object> future = executeOperationAsync(actor, message);
 
         try {
             return Await.result(future, operationDuration);
         } catch (Exception e) {
-            throw new TimeoutException("Sending message " + message.getClass().toString() +
-                    " to actor " + actor.toString() + " failed. Try again later.", e);
+            throw new TimeoutException("Sending message " + message.getClass().toString()
+                    " to actor " + actor.toString() + " failed. Try again later.", e);
         }
     }
 
+    public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
+        Preconditions.checkArgument(actor != null, "actor must not be null");
+        Preconditions.checkArgument(message != null, "message must not be null");
+
+        LOG.debug("Sending message {} to {}", message.getClass(), actor);
+        return doAsk(actor, message, timeout);
+    }
+
     /**
      * Execute an operation on a remote actor asynchronously.
      *
@@ -366,36 +387,40 @@ public class ActorContext {
         actor.tell(message, ActorRef.noSender());
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     public void shutdown() {
-        shardManager.tell(PoisonPill.getInstance(), null);
-        actorSystem.shutdown();
+        FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
+        try {
+            Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
+        } catch (Exception e) {
+            LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
+        }
     }
 
     public ClusterWrapper getClusterWrapper() {
         return clusterWrapper;
     }
 
-    public String getCurrentMemberName(){
+    public MemberName getCurrentMemberName() {
         return clusterWrapper.getCurrentMemberName();
     }
 
     /**
-     * Send the message to each and every shard
-     *
-     * @param message
+     * Send the message to each and every shard.
      */
-    public void broadcast(final Object message){
-        for(final String shardName : configuration.getAllShardNames()){
+    public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass) {
+        for (final String shardName : configuration.getAllShardNames()) {
 
-            Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
-            primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+            Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
+            primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
-                public void onComplete(Throwable failure, ActorSelection primaryShard) {
-                    if(failure != null) {
+                public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
+                    if (failure != null) {
                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
-                                message.getClass().getSimpleName(), shardName, failure);
+                            messageClass.getSimpleName(), shardName, failure);
                     } else {
-                        primaryShard.tell(message, ActorRef.noSender());
+                        Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
+                        primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
                     }
                 }
             }, getClientDispatcher());
@@ -406,6 +431,10 @@ public class ActorContext {
         return operationDuration;
     }
 
+    public Timeout getOperationTimeout() {
+        return operationTimeout;
+    }
+
     public boolean isPathLocal(String path) {
         if (Strings.isNullOrEmpty(path)) {
             return false;
@@ -433,123 +462,88 @@ public class ActorContext {
         }
     }
 
-    /**
-     * @deprecated This method is present only to support backward compatibility with Helium and should not be
-     * used any further
-     *
-     *
-     * @param primaryPath
-     * @param localPathOfRemoteActor
-     * @return
-    */
-    @Deprecated
-    public String resolvePath(final String primaryPath,
-                                            final String localPathOfRemoteActor) {
-        StringBuilder builder = new StringBuilder();
-        String[] primaryPathElements = primaryPath.split("/");
-        builder.append(primaryPathElements[0]).append("//")
-            .append(primaryPathElements[1]).append(primaryPathElements[2]);
-        String[] remotePathElements = localPathOfRemoteActor.split("/");
-        for (int i = 3; i < remotePathElements.length; i++) {
-                builder.append("/").append(remotePathElements[i]);
-            }
-
-        return builder.toString();
-    }
-
-    /**
-     * Get the maximum number of operations that are to be permitted within a transaction before the transaction
-     * should begin throttling the operations
-     *
-     * Parking reading this configuration here because we need to get to the actor system settings
-     *
-     * @return
-     */
-    public int getTransactionOutstandingOperationLimit(){
-        return transactionOutstandingOperationLimit;
-    }
-
     /**
      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
      * us to create a timer for pretty much anything.
      *
-     * @param operationName
-     * @return
+     * @param operationName the name of the operation
+     * @return the Timer instance
      */
-    public Timer getOperationTimer(String operationName){
-        return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
+    public Timer getOperationTimer(String operationName) {
+        return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
     }
 
-    public Timer getOperationTimer(String dataStoreType, String operationName){
+    public Timer getOperationTimer(String dataStoreType, String operationName) {
         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
                 operationName, METRIC_RATE);
         return metricRegistry.timer(rate);
     }
 
     /**
-     * Get the type of the data store to which this ActorContext belongs
+     * Get the name of the data store to which this ActorContext belongs.
      *
-     * @return
+     * @return the data store name
      */
-    public String getDataStoreType() {
-        return datastoreContext.getDataStoreType();
+    public String getDataStoreName() {
+        return datastoreContext.getDataStoreName();
     }
 
     /**
-     * Set the number of transaction creation permits that are to be allowed
+     * Get the current transaction creation rate limit.
      *
-     * @param permitsPerSecond
-     */
-    public void setTxCreationLimit(double permitsPerSecond){
-        txRateLimiter.setRate(permitsPerSecond);
-    }
-
-    /**
-     * Get the current transaction creation rate limit
-     * @return
+     * @return the rate limit
      */
-    public double getTxCreationLimit(){
-        return txRateLimiter.getRate();
+    public double getTxCreationLimit() {
+        return txRateLimiter.getTxCreationLimit();
     }
 
     /**
      * Try to acquire a transaction creation permit. Will block if no permits are available.
      */
-    public void acquireTxCreationPermit(){
+    public void acquireTxCreationPermit() {
         txRateLimiter.acquire();
     }
 
     /**
-     * Return the operation timeout to be used when committing transactions
-     * @return
+     * Returns the operation timeout to be used when committing transactions.
+     *
+     * @return the operation timeout
      */
-    public Timeout getTransactionCommitOperationTimeout(){
+    public Timeout getTransactionCommitOperationTimeout() {
         return transactionCommitOperationTimeout;
     }
 
     /**
      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
-     * code on the datastore
-     * @return
+     * code on the datastore.
+     *
+     * @return the dispatcher
      */
     public ExecutionContext getClientDispatcher() {
         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
     }
 
-    public String getNotificationDispatcherPath(){
+    public String getNotificationDispatcherPath() {
         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
     }
 
-    protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    public ShardStrategyFactory getShardStrategyFactory() {
+        return shardStrategyFactory;
+    }
+
+    protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
         return ask(actorRef, message, timeout);
     }
 
-    protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+    protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout) {
         return ask(actorRef, message, timeout);
     }
 
-    @VisibleForTesting
-    Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
-        return primaryShardActorSelectionCache;
+    public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
+        return primaryShardInfoCache;
     }
 }