Implementation of ModuleShardStrategy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index 3c760f35b8a671be81239e48ef6936a1e065c065..0c4fae0fc68c098e7f2f9e2daa4eba62828a1f20 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSystem;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -29,6 +30,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  *
  */
@@ -41,8 +45,23 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private final String type;
     private final ActorContext actorContext;
 
-    public DistributedDataStore(ActorSystem actorSystem, String type) {
-        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type);
+    private SchemaContext schemaContext;
+
+
+
+    /**
+     * Executor used to run FutureTask's
+     *
+     * This is typically used when we need to make a request to an actor and
+     * wait for it's response and the consumer needs to be provided a Future.
+     *
+     * FIXME : Make the thread pool configurable
+     */
+    private final ExecutorService executor =
+        Executors.newFixedThreadPool(10);
+
+    public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
+        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type, cluster, configuration), "shardmanager-" + type), configuration), type);
     }
 
     public DistributedDataStore(ActorContext actorContext, String type) {
@@ -57,41 +76,47 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         AsyncDataBroker.DataChangeScope scope) {
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props());
+            DataChangeListener.props(listener));
+
+        String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
+        Object result = actorContext.executeShardOperation(shardName,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
-                AsyncDataBroker.DataChangeScope.BASE),
+                AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
             ActorContext.ASK_DURATION
         );
 
-        RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
-        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener);
+        RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result);
+        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
     }
 
 
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext);
+        return new TransactionChainProxy(actorContext, executor, schemaContext);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
+            executor, schemaContext);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
+            executor, schemaContext);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
+            executor, schemaContext);
     }
 
     @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
         actorContext.getShardManager().tell(
             new UpdateSchemaContext(schemaContext), null);
     }