Bug 1435: CDS: Added support for custom commit cohort.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index 7f2416efc36cc17ce9101fc968ca45a1edbc3adf..ea7330ae23316a36637081142480a8acc781ba0c 100644 (file)
@@ -19,18 +19,23 @@ import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -42,8 +47,8 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-public class DistributedDataStore implements DOMStore, SchemaContextListener,
-        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable {
+public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener,
+        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     private static final String UNKNOWN_TYPE = "unknown";
@@ -53,7 +58,6 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     private final ActorContext actorContext;
     private final long waitTillReadyTimeInMillis;
 
-
     private AutoCloseable closeable;
 
     private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
@@ -67,13 +71,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     private final TransactionContextFactory txContextFactory;
 
     public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
-            Configuration configuration, DatastoreContextFactory datastoreContextFactory) {
+            Configuration configuration, DatastoreContextFactory datastoreContextFactory,
+            DatastoreSnapshot restoreFromSnapshot) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
 
-        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
+        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
 
         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
 
@@ -83,9 +88,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
 
         PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
-        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
-                datastoreContextFactory, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
-                configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
+
+        ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration).
+                datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch).
+                primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot);
+
+        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
+                shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
 
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
@@ -151,6 +160,23 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         return listenerRegistrationProxy;
     }
 
+
+    @Override
+    public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
+            DOMDataTreeIdentifier subtree, C cohort) {
+        YangInstanceIdentifier treeId =
+                Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
+        Preconditions.checkNotNull(cohort, "listener should not be null");
+
+
+        final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
+        LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
+
+        DataTreeCohortRegistrationProxy<C> cohortProxy = new DataTreeCohortRegistrationProxy<C>(actorContext, subtree, cohort);
+        cohortProxy.init(shardName);
+        return cohortProxy;
+    }
+
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return txContextFactory.createTransactionChain();
@@ -180,7 +206,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
     @Override
     public void onDatastoreContextUpdated(DatastoreContextFactory contextFactory) {
-        LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType());
+        LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
 
         actorContext.setDatastoreContext(contextFactory);
         datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
@@ -188,6 +214,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
     @Override
     public void close() {
+        LOG.info("Closing data store {}", type);
+
         if (datastoreConfigMXBean != null) {
             datastoreConfigMXBean.unregisterMBean();
         }
@@ -207,6 +235,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         actorContext.shutdown();
     }
 
+    @Override
     public ActorContext getActorContext() {
         return actorContext;
     }
@@ -225,17 +254,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         }
     }
 
-    private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
-                                        DatastoreContextFactory datastoreContextFactory, String shardDispatcher,
-                                        String shardManagerId, PrimaryShardInfoFutureCache primaryShardInfoCache){
+    private static ActorRef createShardManager(ActorSystem actorSystem, ShardManagerCreator creator,
+            String shardDispatcher, String shardManagerId) {
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {
             try {
-                return actorSystem.actorOf(
-                        ShardManager.props(cluster, configuration, datastoreContextFactory, waitTillReadyCountDownLatch,
-                                primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
-                                        ActorContext.MAILBOX), shardManagerId);
+                return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
+                        ActorContext.BOUNDED_MAILBOX), shardManagerId);
             } catch (Exception e){
                 lastException = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);