Merge "Bug 1637: Change Rpc actor calls to async"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index 404a4e02033ea1c89f9fada4135cfe0e4b6a935e..0a137e07df43a1bb7ca2fb3e854d7f63adfd46a3 100644 (file)
@@ -8,14 +8,16 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.TimeUnit;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -34,6 +36,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.duration.Duration;
+
 /**
  *
  */
@@ -42,11 +46,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
     private final ActorContext actorContext;
-
-    private SchemaContext schemaContext;
+    private final DatastoreContext datastoreContext;
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
-            Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
@@ -57,13 +60,25 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
-        this.actorContext = new ActorContext(actorSystem, actorSystem
-            .actorOf(ShardManager.props(type, cluster, configuration, dataStoreProperties),
-                shardManagerId ), cluster, configuration);
+        datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.create(
+                dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
+                dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
+                dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
+                Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
+                        TimeUnit.MINUTES));
+
+        actorContext
+                = new ActorContext(
+                    actorSystem, actorSystem.actorOf(
+                        ShardManager.props(type, cluster, configuration, datastoreContext).
+                            withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
+
+        actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
     }
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.datastoreContext = new DatastoreContext();
     }
 
 
@@ -80,13 +95,12 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props(schemaContext,listener,path ));
+            DataChangeListener.props(listener ));
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         Object result = actorContext.executeLocalShardOperation(shardName,
-            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-            ActorContext.ASK_DURATION);
+            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope));
 
         if (result != null) {
             RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
@@ -104,35 +118,31 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext, schemaContext);
+        return new TransactionChainProxy(actorContext);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
     }
 
-    @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
-        this.schemaContext = schemaContext;
-        actorContext.getShardManager().tell(
-            new UpdateSchemaContext(schemaContext), null);
+    @Override
+    public void onGlobalContextUpdated(SchemaContext schemaContext) {
+        actorContext.setSchemaContext(schemaContext);
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         actorContext.shutdown();
-
     }
 }