X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStore.java;h=780f28f358ec327f5ec6ab9cace63f769695e55a;hb=0de5494497cbd7ba9894935932d07e72a4be15b3;hp=2ef8e5f449f8df564ae42943c6597224b832bcb4;hpb=83140d53722ad77dd804f7b4d761a673110b83b3;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 2ef8e5f449..780f28f358 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -8,8 +8,11 @@ package org.opendaylight.controller.cluster.datastore; +import java.util.concurrent.Executors; + import akka.actor.ActorRef; import akka.actor.ActorSystem; + import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -30,8 +33,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * @@ -41,6 +44,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); + private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10; private final String type; private final ActorContext actorContext; @@ -55,10 +59,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au * This is typically used when we need to make a request to an actor and * wait for it's response and the consumer needs to be provided a Future. * - * FIXME : Make the thread pool configurable + * FIXME : Make the thread pool size configurable. */ - private final ExecutorService executor = - Executors.newFixedThreadPool(10); + private final ListeningExecutorService executor = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE)); public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) { this(new ActorContext(actorSystem, actorSystem @@ -82,18 +86,30 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); - Object result = actorContext.executeShardOperation(shardName, + Object result = actorContext.executeLocalShardOperation(shardName, new RegisterChangeListener(path, dataChangeListenerActor.path(), scope).toSerializable(), ActorContext.ASK_DURATION ); - RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result); - return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor); + if (result != null) { + RegisterChangeListenerReply reply = RegisterChangeListenerReply + .fromSerializable(actorContext.getActorSystem(), result); + return new DataChangeListenerRegistrationProxy(actorContext + .actorSelection(reply.getListenerRegistrationPath()), listener, + dataChangeListenerActor); + } + + LOG.debug( + "No local shard for shardName {} was found so returning a noop registration", + shardName); + return new NoOpDataChangeListenerRegistration(listener); } + + @Override public DOMStoreTransactionChain createTransactionChain() { return new TransactionChainProxy(actorContext, executor, schemaContext);