X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStore.java;h=f6c31aab04c76a376e18dad96dae95aa9cffb637;hb=96171122685765f15a6faf0cc6f919221224870c;hp=c780881a2ffad1ed50695b7a38111068ec2f8e3f;hpb=7225f60c394a26143f8421b0f99f2585699fa306;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index c780881a2f..f6c31aab04 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -12,6 +12,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.dispatch.OnComplete; import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -76,44 +78,44 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au Preconditions.checkNotNull(path, "path should not be null"); Preconditions.checkNotNull(listener, "listener should not be null"); - if(LOG.isDebugEnabled()) { - LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); - } - ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf( - DataChangeListener.props(listener )); + + LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); - Future future = actorContext.executeLocalShardOperationAsync(shardName, - new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), - new Timeout(actorContext.getOperationDuration().$times( - REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR))); + Optional shard = actorContext.findLocalShard(shardName); + + //if shard is NOT local + if (!shard.isPresent()) { + LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName); + return new NoOpDataChangeListenerRegistration(listener); + } + //if shard is local + ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener)); + Future future = actorContext.executeOperationAsync(shard.get(), + new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), + new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR))); - if (future != null) { - final DataChangeListenerRegistrationProxy listenerRegistrationProxy = + final DataChangeListenerRegistrationProxy listenerRegistrationProxy = new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor); - future.onComplete(new OnComplete(){ + future.onComplete(new OnComplete() { - @Override public void onComplete(Throwable failure, Object result) + @Override + public void onComplete(Throwable failure, Object result) throws Throwable { - if(failure != null){ - LOG.error("Failed to register listener at path " + path.toString(), failure); - return; - } - RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; - listenerRegistrationProxy.setListenerRegistrationActor(actorContext - .actorSelection(reply.getListenerRegistrationPath())); + if (failure != null) { + LOG.error("Failed to register listener at path " + path.toString(), failure); + return; } - }, actorContext.getActorSystem().dispatcher()); - return listenerRegistrationProxy; - } - if(LOG.isDebugEnabled()) { - LOG.debug( - "No local shard for shardName {} was found so returning a noop registration", - shardName); - } - return new NoOpDataChangeListenerRegistration(listener); + RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; + listenerRegistrationProxy.setListenerRegistrationActor(actorContext + .actorSelection(reply.getListenerRegistrationPath())); + } + }, actorContext.getActorSystem().dispatcher()); + + return listenerRegistrationProxy; + } @Override @@ -145,4 +147,9 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au public void close() throws Exception { actorContext.shutdown(); } + + @VisibleForTesting + ActorContext getActorContext() { + return actorContext; + } }