import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
+import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
private final Dispatchers dispatchers;
+ private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
setCachedProperties();
+ primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+ .build();
+
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+ transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+ TimeUnit.SECONDS));
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
}
public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+ Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+ if(ret != null){
+ return ret;
+ }
Future<Object> future = executeOperationAsync(shardManager,
new FindPrimary(shardName, true).toSerializable(),
datastoreContext.getShardInitializationTimeout());
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
public ActorSelection checkedApply(Object response) throws Exception {
- if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+ if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
PrimaryFound found = PrimaryFound.fromSerializable(response);
LOG.debug("Primary found {}", found.getPrimaryPath());
- return actorSystem.actorSelection(found.getPrimaryPath());
+ ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
+ primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
+ return actorSelection;
} else if(response instanceof ActorNotInitialized) {
throw new NotInitializedException(
String.format("Found primary shard %s but it's not initialized yet. " +
Preconditions.checkArgument(message != null, "message must not be null");
LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return ask(actor, message, timeout);
+ return doAsk(actor, message, timeout);
}
/**
LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return ask(actor, message, timeout);
+ return doAsk(actor, message, timeout);
}
/**
return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
}
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ @VisibleForTesting
+ Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
+ return primaryShardActorSelectionCache;
+ }
}