BUG 8649: remove bounded mailbox from ShardManager and notification actors
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractDataStore.java
index db32b36d6e71275c121f8b76c607fee8f1c038ff..477e05bcc6ecb7c92d1bda3e22cd28c52def06fd 100644 (file)
@@ -25,13 +25,15 @@ import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
@@ -94,7 +96,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
                 .datastoreContextFactory(datastoreContextFactory)
                 .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
                 .primaryShardInfoCache(primaryShardInfoCache)
-                .restoreFromSnapshot(restoreFromSnapshot);
+                .restoreFromSnapshot(restoreFromSnapshot)
+                .distributedDataStore(this);
 
         actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
                 shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
@@ -136,6 +139,16 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
                 .duration().toMillis() * READY_WAIT_FACTOR;
     }
 
+    @VisibleForTesting
+    protected AbstractDataStore(final ActorContext actorContext, final ClientIdentifier identifier,
+                                final DataStoreClient clientActor) {
+        this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.client = clientActor;
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+                .duration().toMillis() * READY_WAIT_FACTOR;
+    }
+
     protected final DataStoreClient getClient() {
         return client;
     }
@@ -270,8 +283,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
 
         for (int i = 0; i < 100; i++) {
             try {
-                return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
-                        ActorContext.BOUNDED_MAILBOX), shardManagerId);
+                return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher), shardManagerId);
             } catch (Exception e) {
                 lastException = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -287,4 +299,44 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
     public CountDownLatch getWaitTillReadyCountDownLatch() {
         return waitTillReadyCountDownLatch;
     }
+
+    @SuppressWarnings("unchecked")
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
+            final YangInstanceIdentifier shardLookup,
+            final YangInstanceIdentifier insideShard,
+            final org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener delegate) {
+
+        Preconditions.checkNotNull(shardLookup, "shardLookup should not be null");
+        Preconditions.checkNotNull(insideShard, "insideShard should not be null");
+        Preconditions.checkNotNull(delegate, "delegate should not be null");
+
+        final String shardName = actorContext.getShardStrategyFactory().getStrategy(shardLookup).findShard(shardLookup);
+        LOG.debug("Registering tree listener: {} for tree: {} shard: {}, path inside shard: {}",
+                delegate,shardLookup, shardName, insideShard);
+
+        final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> listenerRegistrationProxy =
+                new DataTreeChangeListenerProxy<>(actorContext,
+                        // wrap this in the ClusteredDOMDataTreeChangeLister interface
+                        // since we always want clustered registration
+                        (ClusteredDOMDataTreeChangeListener) delegate::onDataTreeChanged, insideShard);
+        listenerRegistrationProxy.init(shardName);
+
+        return (ListenerRegistration<L>) listenerRegistrationProxy;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+            final YangInstanceIdentifier internalPath,
+            final DOMDataTreeChangeListener delegate) {
+        Preconditions.checkNotNull(delegate, "delegate should not be null");
+
+        LOG.debug("Registering a listener for the configuration shard: {}", internalPath);
+
+        final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+                new DataTreeChangeListenerProxy<>(actorContext, delegate, internalPath);
+        proxy.init(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
+
+        return (ListenerRegistration<L>) proxy;
+    }
+
 }