import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
- private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+ private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
- primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ primaryShardInfoCache = CacheBuilder.newBuilder()
.expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
.build();
}
return schemaContext;
}
- public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
- Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+ public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
+ Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
if(ret != null){
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
new FindPrimary(shardName, true), shardInitializationTimeout);
- return future.transform(new Mapper<Object, ActorSelection>() {
+ return future.transform(new Mapper<Object, PrimaryShardInfo>() {
@Override
- public ActorSelection checkedApply(Object response) throws Exception {
+ public PrimaryShardInfo checkedApply(Object response) throws Exception {
if(response instanceof PrimaryFound) {
PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
- primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
- return actorSelection;
+ PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
+ primaryShardInfoCache.put(shardName, Futures.successful(info));
+ return info;
} else if(response instanceof NotInitializedException) {
throw (NotInitializedException)response;
} else if(response instanceof PrimaryNotFoundException) {
public void broadcast(final Object message){
for(final String shardName : configuration.getAllShardNames()){
- Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
- primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
if(failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
message.getClass().getSimpleName(), shardName, failure);
} else {
- primaryShard.tell(message, ActorRef.noSender());
+ primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
}
}
}, getClientDispatcher());
}
@VisibleForTesting
- Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
- return primaryShardActorSelectionCache;
+ Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+ return primaryShardInfoCache;
}
}