X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractDataStore.java;h=477e05bcc6ecb7c92d1bda3e22cd28c52def06fd;hp=87706763c5f59d64c73fedfb81d17d8c18a81f5b;hb=08269ac33356663d75f0df8cc54936eac5553e7b;hpb=2f77e92af7a68b4a97dbfb709c6cc9b11a49878a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java index 87706763c5..477e05bcc6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -28,10 +28,12 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXB import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; @@ -94,7 +96,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface .datastoreContextFactory(datastoreContextFactory) .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch) .primaryShardInfoCache(primaryShardInfoCache) - .restoreFromSnapshot(restoreFromSnapshot); + .restoreFromSnapshot(restoreFromSnapshot) + .distributedDataStore(this); actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher, shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), @@ -136,6 +139,16 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface .duration().toMillis() * READY_WAIT_FACTOR; } + @VisibleForTesting + protected AbstractDataStore(final ActorContext actorContext, final ClientIdentifier identifier, + final DataStoreClient clientActor) { + this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); + this.client = clientActor; + this.identifier = Preconditions.checkNotNull(identifier); + this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout() + .duration().toMillis() * READY_WAIT_FACTOR; + } + protected final DataStoreClient getClient() { return client; } @@ -270,8 +283,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface for (int i = 0; i < 100; i++) { try { - return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox( - ActorContext.BOUNDED_MAILBOX), shardManagerId); + return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher), shardManagerId); } catch (Exception e) { lastException = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -287,4 +299,44 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface public CountDownLatch getWaitTillReadyCountDownLatch() { return waitTillReadyCountDownLatch; } + + @SuppressWarnings("unchecked") + public ListenerRegistration registerProxyListener( + final YangInstanceIdentifier shardLookup, + final YangInstanceIdentifier insideShard, + final org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener delegate) { + + Preconditions.checkNotNull(shardLookup, "shardLookup should not be null"); + Preconditions.checkNotNull(insideShard, "insideShard should not be null"); + Preconditions.checkNotNull(delegate, "delegate should not be null"); + + final String shardName = actorContext.getShardStrategyFactory().getStrategy(shardLookup).findShard(shardLookup); + LOG.debug("Registering tree listener: {} for tree: {} shard: {}, path inside shard: {}", + delegate,shardLookup, shardName, insideShard); + + final DataTreeChangeListenerProxy listenerRegistrationProxy = + new DataTreeChangeListenerProxy<>(actorContext, + // wrap this in the ClusteredDOMDataTreeChangeLister interface + // since we always want clustered registration + (ClusteredDOMDataTreeChangeListener) delegate::onDataTreeChanged, insideShard); + listenerRegistrationProxy.init(shardName); + + return (ListenerRegistration) listenerRegistrationProxy; + } + + @SuppressWarnings("unchecked") + public ListenerRegistration registerShardConfigListener( + final YangInstanceIdentifier internalPath, + final DOMDataTreeChangeListener delegate) { + Preconditions.checkNotNull(delegate, "delegate should not be null"); + + LOG.debug("Registering a listener for the configuration shard: {}", internalPath); + + final DataTreeChangeListenerProxy proxy = + new DataTreeChangeListenerProxy<>(actorContext, delegate, internalPath); + proxy.init(ClusterUtils.PREFIX_CONFIG_SHARD_ID); + + return (ListenerRegistration) proxy; + } + }