import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
+import akka.dispatch.Futures;
import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
- private final DatastoreContext datastoreContext;
- private final FiniteDuration operationDuration;
- private final Timeout operationTimeout;
+ private DatastoreContext datastoreContext;
+ private FiniteDuration operationDuration;
+ private Timeout operationTimeout;
private final String selfAddressHostPort;
- private final RateLimiter txRateLimiter;
+ private RateLimiter txRateLimiter;
private final MetricRegistry metricRegistry = new MetricRegistry();
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
- private final Timeout transactionCommitOperationTimeout;
+ private Timeout transactionCommitOperationTimeout;
+ private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
+ private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
private volatile SchemaContext schemaContext;
+ private volatile boolean updated;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
- this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
- operationTimeout = new Timeout(operationDuration);
- transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
- TimeUnit.SECONDS));
-
+ setCachedProperties();
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
}
+ private void setCachedProperties() {
+ txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+
+ transactionCommitOperationTimeout = new Timeout(Duration.create(
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+
+ shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+
+ primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+ .build();
+ }
+
public DatastoreContext getDatastoreContext() {
return datastoreContext;
}
this.schemaContext = schemaContext;
if(shardManager != null) {
- shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
}
}
- public SchemaContext getSchemaContext() {
- return schemaContext;
- }
+ public void setDatastoreContext(DatastoreContext context) {
+ this.datastoreContext = context;
+ setCachedProperties();
- /**
- * Finds the primary shard for the given shard name
- *
- * @param shardName
- * @return
- */
- public Optional<ActorSelection> findPrimaryShard(String shardName) {
- String path = findPrimaryPathOrNull(shardName);
- if (path == null){
- return Optional.absent();
+ // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
+ // will be published immediately even though they may not be immediately visible to other
+ // threads due to unsynchronized reads. That's OK though - we're going for eventual
+ // consistency here as immediately visible updates to these members aren't critical. These
+ // members could've been made volatile but wanted to avoid volatile reads as these are
+ // accessed often and updates will be infrequent.
+
+ updated = true;
+
+ if(shardManager != null) {
+ shardManager.tell(context, ActorRef.noSender());
}
- return Optional.of(actorSystem.actorSelection(path));
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
}
public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+ Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+ if(ret != null){
+ return ret;
+ }
Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true).toSerializable(),
- datastoreContext.getShardInitializationTimeout());
+ new FindPrimary(shardName, true), shardInitializationTimeout);
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
public ActorSelection checkedApply(Object response) throws Exception {
- if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound found = PrimaryFound.fromSerializable(response);
+ if(response instanceof PrimaryFound) {
+ PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
- return actorSystem.actorSelection(found.getPrimaryPath());
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found primary shard %s but it's not initialized yet. " +
- "Please try again later", shardName));
- } else if(response instanceof PrimaryNotFound) {
- throw new PrimaryNotFoundException(
- String.format("No primary shard found for %S.", shardName));
+ ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
+ primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
+ return actorSelection;
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
+ } else if(response instanceof NoShardLeaderException) {
+ throw (NoShardLeaderException)response;
}
throw new UnknownMessageException(String.format(
*/
public Future<ActorRef> findLocalShardAsync( final String shardName) {
Future<Object> future = executeOperationAsync(shardManager,
- new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
+ new FindLocalShard(shardName, true), shardInitializationTimeout);
return future.map(new Mapper<Object, ActorRef>() {
@Override
LocalShardFound found = (LocalShardFound)response;
LOG.debug("Local shard found {}", found.getPath());
return found.getPath();
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found local shard for %s but it's not initialized yet.",
- shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
} else if(response instanceof LocalShardNotFound) {
throw new LocalShardNotFoundException(
String.format("Local shard for %s does not exist.", shardName));
}, getClientDispatcher());
}
- private String findPrimaryPathOrNull(String shardName) {
- Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
-
- if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound found = PrimaryFound.fromSerializable(result);
-
- LOG.debug("Primary found {}", found.getPrimaryPath());
- return found.getPrimaryPath();
-
- } else if (result.getClass().equals(ActorNotInitialized.class)){
- throw new NotInitializedException(
- String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
- );
-
- } else {
- return null;
- }
- }
-
-
/**
* Executes an operation on a local actor and wait for it's response
*
Preconditions.checkArgument(message != null, "message must not be null");
LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return ask(actor, message, timeout);
+ return doAsk(actor, message, timeout);
}
/**
LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return ask(actor, message, timeout);
+ return doAsk(actor, message, timeout);
}
/**
*
* @param message
*/
- public void broadcast(Object message){
- for(String shardName : configuration.getAllShardNames()){
-
- Optional<ActorSelection> primary = findPrimaryShard(shardName);
- if (primary.isPresent()) {
- primary.get().tell(message, ActorRef.noSender());
- } else {
- LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
- message.getClass().getSimpleName(), shardName);
- }
+ public void broadcast(final Object message){
+ for(final String shardName : configuration.getAllShardNames()){
+
+ Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ if(failure != null) {
+ LOG.warn("broadcast failed to send message {} to shard {}: {}",
+ message.getClass().getSimpleName(), shardName, failure);
+ } else {
+ primaryShard.tell(message, ActorRef.noSender());
+ }
+ }
+ }, getClientDispatcher());
}
}
return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
}
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ @VisibleForTesting
+ Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
+ return primaryShardActorSelectionCache;
+ }
}