Do not encode/decode DataChange registrations and notifications
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index 2ef8e5f449f8df564ae42943c6597224b832bcb4..479af79748033342041f32fe5221e68f78bf1c2f 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -30,7 +32,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 /**
@@ -41,6 +42,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger
         LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
+    private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10;
 
     private final String type;
     private final ActorContext actorContext;
@@ -55,10 +57,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
      * This is typically used when we need to make a request to an actor and
      * wait for it's response and the consumer needs to be provided a Future.
      *
-     * FIXME : Make the thread pool configurable
+     * FIXME : Make the thread pool size configurable.
      */
-    private final ExecutorService executor =
-        Executors.newFixedThreadPool(10);
+    private final ListeningExecutorService executor =
+        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE));
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
         this(new ActorContext(actorSystem, actorSystem
@@ -82,18 +84,29 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Object result = actorContext.executeShardOperation(shardName,
+        Object result = actorContext.executeLocalShardOperation(shardName,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
-                scope).toSerializable(),
+                scope),
             ActorContext.ASK_DURATION
         );
 
-        RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result);
-        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
+        if (result != null) {
+            RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+            return new DataChangeListenerRegistrationProxy(actorContext
+                .actorSelection(reply.getListenerRegistrationPath()), listener,
+                dataChangeListenerActor);
+        }
+
+        LOG.debug(
+            "No local shard for shardName {} was found so returning a noop registration",
+            shardName);
+        return new NoOpDataChangeListenerRegistration(listener);
     }
 
 
 
+
+
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return new TransactionChainProxy(actorContext, executor, schemaContext);