X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractDataStore.java;h=7f759d1b4296adb275f6cf7ee3296dde8c252623;hb=02d2891da4b71fe01ea8027a88ac6ddd353c2bcd;hp=dce5368218f30685f30fcc2894a5be73c08cab5a;hpb=28e9832cc97a345d5ceb69262784e5c8fef77e37;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java index dce5368218..7f759d1b42 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.config.Configuration; @@ -28,19 +29,17 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXB import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; +import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; @@ -50,7 +49,7 @@ import org.slf4j.LoggerFactory; * Base implementation of a distributed DOMStore. */ public abstract class AbstractDataStore implements DistributedDataStoreInterface, SchemaContextListener, - DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, + DatastoreContextPropertiesUpdater.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class); @@ -94,7 +93,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface .datastoreContextFactory(datastoreContextFactory) .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch) .primaryShardInfoCache(primaryShardInfoCache) - .restoreFromSnapshot(restoreFromSnapshot); + .restoreFromSnapshot(restoreFromSnapshot) + .distributedDataStore(this); actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher, shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), @@ -108,7 +108,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface } catch (Exception e) { LOG.error("Failed to get actor for {}", clientProps, e); clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } identifier = client.getIdentifier(); @@ -158,27 +159,6 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface this.closeable = closeable; } - @SuppressWarnings("unchecked") - @Override - public >> - ListenerRegistration registerChangeListener( - final YangInstanceIdentifier path, final L listener, - final AsyncDataBroker.DataChangeScope scope) { - - Preconditions.checkNotNull(path, "path should not be null"); - Preconditions.checkNotNull(listener, "listener should not be null"); - - LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); - - String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); - - final DataChangeListenerRegistrationProxy listenerRegistrationProxy = - new DataChangeListenerRegistrationProxy(shardName, actorContext, listener); - listenerRegistrationProxy.init(path, scope); - - return listenerRegistrationProxy; - } - @Override public ListenerRegistration registerTreeChangeListener( final YangInstanceIdentifier treeId, final L listener) { @@ -280,8 +260,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface for (int i = 0; i < 100; i++) { try { - return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox( - ActorContext.BOUNDED_MAILBOX), shardManagerId); + return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher), shardManagerId); } catch (Exception e) { lastException = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -313,10 +292,28 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface delegate,shardLookup, shardName, insideShard); final DataTreeChangeListenerProxy listenerRegistrationProxy = - new DataTreeChangeListenerProxy<>(actorContext, delegate::onDataTreeChanged, insideShard); + new DataTreeChangeListenerProxy<>(actorContext, + // wrap this in the ClusteredDOMDataTreeChangeLister interface + // since we always want clustered registration + (ClusteredDOMDataTreeChangeListener) delegate::onDataTreeChanged, insideShard); listenerRegistrationProxy.init(shardName); return (ListenerRegistration) listenerRegistrationProxy; } + @SuppressWarnings("unchecked") + public ListenerRegistration registerShardConfigListener( + final YangInstanceIdentifier internalPath, + final DOMDataTreeChangeListener delegate) { + Preconditions.checkNotNull(delegate, "delegate should not be null"); + + LOG.debug("Registering a listener for the configuration shard: {}", internalPath); + + final DataTreeChangeListenerProxy proxy = + new DataTreeChangeListenerProxy<>(actorContext, delegate, internalPath); + proxy.init(ClusterUtils.PREFIX_CONFIG_SHARD_ID); + + return (ListenerRegistration) proxy; + } + }