Changed RegisterChangeListener and RegisterChangeListenerReply
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index f64c6f1a8669888726f30bfe4099aa628365ccbb..3c12fa6e86158aed9579b245f3a418b1d9df87b3 100644 (file)
@@ -29,10 +29,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  *
  */
-public class DistributedDataStore implements DOMStore, SchemaContextListener {
+public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
 
     private static final Logger
         LOG = LoggerFactory.getLogger(DistributedDataStore.class);
@@ -41,8 +44,23 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener {
     private final String type;
     private final ActorContext actorContext;
 
+    private SchemaContext schemaContext;
+
+
+
+    /**
+     * Executor used to run FutureTask's
+     *
+     * This is typically used when we need to make a request to an actor and
+     * wait for it's response and the consumer needs to be provided a Future.
+     *
+     * FIXME : Make the thread pool configurable
+     */
+    private final ExecutorService executor =
+        Executors.newFixedThreadPool(10);
+
     public DistributedDataStore(ActorSystem actorSystem, String type) {
-        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type);
+        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type), "shardmanager-" + type)), type);
     }
 
     public DistributedDataStore(ActorContext actorContext, String type) {
@@ -56,40 +74,52 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener {
         InstanceIdentifier path, L listener,
         AsyncDataBroker.DataChangeScope scope) {
 
-        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props());
+        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+            DataChangeListener.props(listener));
 
         Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
-                AsyncDataBroker.DataChangeScope.BASE),
-            ActorContext.ASK_DURATION);
+                AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
+            ActorContext.ASK_DURATION
+        );
 
-        RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
-        return new ListenerRegistrationProxy(reply.getListenerRegistrationPath());
+        RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result);
+        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
     }
 
 
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext);
+        return new TransactionChainProxy(actorContext, executor, schemaContext);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
+            executor, schemaContext);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
+            executor, schemaContext);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
+            executor, schemaContext);
     }
 
     @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
-        actorContext.getShardManager().tell(new UpdateSchemaContext(schemaContext), null);
+        this.schemaContext = schemaContext;
+        actorContext.getShardManager().tell(
+            new UpdateSchemaContext(schemaContext), null);
+    }
+
+    @Override public void close() throws Exception {
+        actorContext.shutdown();
+
     }
 }