import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
+import akka.dispatch.Futures;
import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
+ private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
+ private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
transactionCommitOperationTimeout = new Timeout(Duration.create(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+
+ shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+
+ primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+ .build();
}
public DatastoreContext getDatastoreContext() {
return schemaContext;
}
- /**
- * Finds the primary shard for the given shard name
- *
- * @param shardName
- * @return
- */
- public Optional<ActorSelection> findPrimaryShard(String shardName) {
- String path = findPrimaryPathOrNull(shardName);
- if (path == null){
- return Optional.absent();
- }
- return Optional.of(actorSystem.actorSelection(path));
- }
-
public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+ Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+ if(ret != null){
+ return ret;
+ }
Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true).toSerializable(),
- datastoreContext.getShardInitializationTimeout());
+ new FindPrimary(shardName, true), shardInitializationTimeout);
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
public ActorSelection checkedApply(Object response) throws Exception {
- if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound found = PrimaryFound.fromSerializable(response);
+ if(response instanceof PrimaryFound) {
+ PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
- return actorSystem.actorSelection(found.getPrimaryPath());
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found primary shard %s but it's not initialized yet. " +
- "Please try again later", shardName));
- } else if(response instanceof PrimaryNotFound) {
- throw new PrimaryNotFoundException(
- String.format("No primary shard found for %S.", shardName));
+ ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
+ primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
+ return actorSelection;
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
+ } else if(response instanceof NoShardLeaderException) {
+ throw (NoShardLeaderException)response;
}
throw new UnknownMessageException(String.format(
*/
public Future<ActorRef> findLocalShardAsync( final String shardName) {
Future<Object> future = executeOperationAsync(shardManager,
- new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
+ new FindLocalShard(shardName, true), shardInitializationTimeout);
return future.map(new Mapper<Object, ActorRef>() {
@Override
LocalShardFound found = (LocalShardFound)response;
LOG.debug("Local shard found {}", found.getPath());
return found.getPath();
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found local shard for %s but it's not initialized yet.",
- shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
} else if(response instanceof LocalShardNotFound) {
throw new LocalShardNotFoundException(
String.format("Local shard for %s does not exist.", shardName));
}, getClientDispatcher());
}
- private String findPrimaryPathOrNull(String shardName) {
- Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
-
- if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound found = PrimaryFound.fromSerializable(result);
-
- LOG.debug("Primary found {}", found.getPrimaryPath());
- return found.getPrimaryPath();
-
- } else if (result.getClass().equals(ActorNotInitialized.class)){
- throw new NotInitializedException(
- String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
- );
-
- } else {
- return null;
- }
- }
-
-
/**
* Executes an operation on a local actor and wait for it's response
*
Preconditions.checkArgument(message != null, "message must not be null");
LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return ask(actor, message, timeout);
+ return doAsk(actor, message, timeout);
}
/**
LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return ask(actor, message, timeout);
+ return doAsk(actor, message, timeout);
}
/**
*
* @param message
*/
- public void broadcast(Object message){
- for(String shardName : configuration.getAllShardNames()){
-
- Optional<ActorSelection> primary = findPrimaryShard(shardName);
- if (primary.isPresent()) {
- primary.get().tell(message, ActorRef.noSender());
- } else {
- LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
- message.getClass().getSimpleName(), shardName);
- }
+ public void broadcast(final Object message){
+ for(final String shardName : configuration.getAllShardNames()){
+
+ Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ if(failure != null) {
+ LOG.warn("broadcast failed to send message {} to shard {}: {}",
+ message.getClass().getSimpleName(), shardName, failure);
+ } else {
+ primaryShard.tell(message, ActorRef.noSender());
+ }
+ }
+ }, getClientDispatcher());
}
}
return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
}
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ @VisibleForTesting
+ Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
+ return primaryShardActorSelectionCache;
+ }
}