BUG-5280: use MemberName instead of String
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index d31217042aa7f65e801b6a1d7dbc82d89e47ccbc..0244eb37407d53a7de46a44427e4ef571cd5bf49 100644 (file)
@@ -20,18 +20,22 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -43,8 +47,8 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-public class DistributedDataStore implements DOMStore, SchemaContextListener,
-        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable {
+public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener,
+        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     private static final String UNKNOWN_TYPE = "unknown";
@@ -85,11 +89,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
         PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
 
-        ShardManager.Builder builder = ShardManager.builder().cluster(cluster).configuration(configuration).
+        ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration).
                 datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch).
                 primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot);
 
-        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, builder, shardDispatcher,
+        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
                 shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
 
         this.waitTillReadyTimeInMillis =
@@ -156,6 +160,23 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         return listenerRegistrationProxy;
     }
 
+
+    @Override
+    public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
+            DOMDataTreeIdentifier subtree, C cohort) {
+        YangInstanceIdentifier treeId =
+                Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
+        Preconditions.checkNotNull(cohort, "listener should not be null");
+
+
+        final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
+        LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
+
+        DataTreeCohortRegistrationProxy<C> cohortProxy = new DataTreeCohortRegistrationProxy<C>(actorContext, subtree, cohort);
+        cohortProxy.init(shardName);
+        return cohortProxy;
+    }
+
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return txContextFactory.createTransactionChain();
@@ -214,6 +235,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         actorContext.shutdown();
     }
 
+    @Override
     public ActorContext getActorContext() {
         return actorContext;
     }
@@ -232,18 +254,19 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         }
     }
 
-    private static ActorRef createShardManager(ActorSystem actorSystem, ShardManager.Builder builder,
+    private static ActorRef createShardManager(ActorSystem actorSystem, ShardManagerCreator creator,
             String shardDispatcher, String shardManagerId) {
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {
             try {
-                return actorSystem.actorOf(builder.props().withDispatcher(shardDispatcher).withMailbox(
+                return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
                         ActorContext.BOUNDED_MAILBOX), shardManagerId);
             } catch (Exception e){
                 lastException = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-                LOG.debug(String.format("Could not create actor %s because of %s - waiting for sometime before retrying (retry count = %d)", shardManagerId, e.getMessage(), i));
+                LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying (retry count = {})",
+                    shardManagerId, e.getMessage(), i);
             }
         }