X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStore.java;h=0244eb37407d53a7de46a44427e4ef571cd5bf49;hb=4e3f49788c05730b29468deebc2aaa4ed0d94eef;hp=d31217042aa7f65e801b6a1d7dbc82d89e47ccbc;hpb=a46305fbc6bb7ec6883c21298d356a5e4fbbb015;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index d31217042a..0244eb3740 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -20,18 +20,22 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -43,8 +47,8 @@ import org.slf4j.LoggerFactory; /** * */ -public class DistributedDataStore implements DOMStore, SchemaContextListener, - DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable { +public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener, + DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private static final String UNKNOWN_TYPE = "unknown"; @@ -85,11 +89,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); - ShardManager.Builder builder = ShardManager.builder().cluster(cluster).configuration(configuration). + ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration). datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch). primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot); - actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, builder, shardDispatcher, + actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher, shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache); this.waitTillReadyTimeInMillis = @@ -156,6 +160,23 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, return listenerRegistrationProxy; } + + @Override + public DOMDataTreeCommitCohortRegistration registerCommitCohort( + DOMDataTreeIdentifier subtree, C cohort) { + YangInstanceIdentifier treeId = + Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier(); + Preconditions.checkNotNull(cohort, "listener should not be null"); + + + final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); + LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName); + + DataTreeCohortRegistrationProxy cohortProxy = new DataTreeCohortRegistrationProxy(actorContext, subtree, cohort); + cohortProxy.init(shardName); + return cohortProxy; + } + @Override public DOMStoreTransactionChain createTransactionChain() { return txContextFactory.createTransactionChain(); @@ -214,6 +235,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, actorContext.shutdown(); } + @Override public ActorContext getActorContext() { return actorContext; } @@ -232,18 +254,19 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, } } - private static ActorRef createShardManager(ActorSystem actorSystem, ShardManager.Builder builder, + private static ActorRef createShardManager(ActorSystem actorSystem, ShardManagerCreator creator, String shardDispatcher, String shardManagerId) { Exception lastException = null; for(int i=0;i<100;i++) { try { - return actorSystem.actorOf(builder.props().withDispatcher(shardDispatcher).withMailbox( + return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox( ActorContext.BOUNDED_MAILBOX), shardManagerId); } catch (Exception e){ lastException = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - LOG.debug(String.format("Could not create actor %s because of %s - waiting for sometime before retrying (retry count = %d)", shardManagerId, e.getMessage(), i)); + LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying (retry count = {})", + shardManagerId, e.getMessage(), i); } }