X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStore.java;h=cb6d44dd9264db8a0c1198cee2b2406adbde5f46;hp=fe321d4cd2808bff7c6be29b67222b4876c8b2fe;hb=925cb4a228d0fda99c7bfeb432eb25285a223887;hpb=d0621d28e507d9f6c0b9445d197f90253d34725d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index fe321d4cd2..cb6d44dd92 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -51,10 +51,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * Implements a distributed DOMStore. */ public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener, - DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable { + DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, + DOMDataTreeCommitCohortRegistry, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); @@ -76,28 +77,15 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche private final TransactionContextFactory txContextFactory; - public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster, - Configuration configuration, DatastoreContextFactory datastoreContextFactory, - DatastoreSnapshot restoreFromSnapshot) { + @SuppressWarnings("checkstyle:IllegalCatch") + public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, + final Configuration configuration, final DatastoreContextFactory datastoreContextFactory, + final DatastoreSnapshot restoreFromSnapshot) { Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); - final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(), - datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()); - final ActorRef clientActor = actorSystem.actorOf(clientProps); - try { - client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); - } catch (Exception e) { - LOG.error("Failed to get actor for {}", clientProps, e); - clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - throw Throwables.propagate(e); - } - - identifier = client.getIdentifier(); - LOG.debug("Distributed data store client {} started", identifier); - String shardManagerId = ShardManagerIdentifier.builder() .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString(); @@ -108,39 +96,56 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); - ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration). - datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch). - primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot); + ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration) + .datastoreContextFactory(datastoreContextFactory) + .waitTillReadyCountdownLatch(waitTillReadyCountDownLatch) + .primaryShardInfoCache(primaryShardInfoCache) + .restoreFromSnapshot(restoreFromSnapshot); actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher, - shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache); + shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), + primaryShardInfoCache); - this.waitTillReadyTimeInMillis = - actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; + final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(), + datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext); + final ActorRef clientActor = actorSystem.actorOf(clientProps); + try { + client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Failed to get actor for {}", clientProps, e); + clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + throw Throwables.propagate(e); + } - this.txContextFactory = TransactionContextFactory.create(actorContext); + identifier = client.getIdentifier(); + LOG.debug("Distributed data store client {} started", identifier); + + this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout() + .duration().toMillis() * READY_WAIT_FACTOR; + + this.txContextFactory = new TransactionContextFactory(actorContext, identifier); datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl( datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext()); datastoreConfigMXBean.registerMBean(); - datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext(). - getDataStoreMXBeanType(), actorContext); + datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext() + .getDataStoreMXBeanType(), actorContext); datastoreInfoMXBean.registerMBean(); } @VisibleForTesting - DistributedDataStore(ActorContext actorContext, ClientIdentifier identifier) { + DistributedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.client = null; this.identifier = Preconditions.checkNotNull(identifier); - this.txContextFactory = TransactionContextFactory.create(actorContext); - this.waitTillReadyTimeInMillis = - actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; + this.txContextFactory = new TransactionContextFactory(actorContext, identifier); + this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout() + .duration().toMillis() * READY_WAIT_FACTOR; } - public void setCloseable(AutoCloseable closeable) { + public void setCloseable(final AutoCloseable closeable) { this.closeable = closeable; } @@ -148,8 +153,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche @Override public >> ListenerRegistration registerChangeListener( - final YangInstanceIdentifier path, L listener, - AsyncDataBroker.DataChangeScope scope) { + final YangInstanceIdentifier path, final L listener, + final AsyncDataBroker.DataChangeScope scope) { Preconditions.checkNotNull(path, "path should not be null"); Preconditions.checkNotNull(listener, "listener should not be null"); @@ -166,7 +171,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche } @Override - public ListenerRegistration registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) { + public ListenerRegistration registerTreeChangeListener( + final YangInstanceIdentifier treeId, final L listener) { Preconditions.checkNotNull(treeId, "treeId should not be null"); Preconditions.checkNotNull(listener, "listener should not be null"); @@ -174,7 +180,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); final DataTreeChangeListenerProxy listenerRegistrationProxy = - new DataTreeChangeListenerProxy(actorContext, listener); + new DataTreeChangeListenerProxy<>(actorContext, listener); listenerRegistrationProxy.init(shardName, treeId); return listenerRegistrationProxy; @@ -183,7 +189,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche @Override public DOMDataTreeCommitCohortRegistration registerCommitCohort( - DOMDataTreeIdentifier subtree, C cohort) { + final DOMDataTreeIdentifier subtree, final C cohort) { YangInstanceIdentifier treeId = Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier(); Preconditions.checkNotNull(cohort, "listener should not be null"); @@ -192,7 +198,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName); - DataTreeCohortRegistrationProxy cohortProxy = new DataTreeCohortRegistrationProxy(actorContext, subtree, cohort); + DataTreeCohortRegistrationProxy cohortProxy = + new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort); cohortProxy.init(shardName); return cohortProxy; } @@ -204,7 +211,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY); + return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY); } @Override @@ -220,12 +227,12 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche } @Override - public void onGlobalContextUpdated(SchemaContext schemaContext) { + public void onGlobalContextUpdated(final SchemaContext schemaContext) { actorContext.setSchemaContext(schemaContext); } @Override - public void onDatastoreContextUpdated(DatastoreContextFactory contextFactory) { + public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) { LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName()); actorContext.setDatastoreContext(contextFactory); @@ -233,6 +240,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche } @Override + @SuppressWarnings("checkstyle:IllegalCatch") public void close() { LOG.info("Closing data store {}", identifier); @@ -264,33 +272,35 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche return actorContext; } - public void waitTillReady(){ + public void waitTillReady() { LOG.info("Beginning to wait for data store to become ready : {}", identifier); try { if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) { LOG.debug("Data store {} is now ready", identifier); } else { - LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis)); + LOG.error("Shard leaders failed to settle in {} seconds, giving up", + TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis)); } } catch (InterruptedException e) { LOG.error("Interrupted while waiting for shards to settle", e); } } - private static ActorRef createShardManager(ActorSystem actorSystem, ShardManagerCreator creator, - String shardDispatcher, String shardManagerId) { + @SuppressWarnings("checkstyle:IllegalCatch") + private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator, + final String shardDispatcher, final String shardManagerId) { Exception lastException = null; - for(int i=0;i<100;i++) { + for (int i = 0; i < 100; i++) { try { return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox( ActorContext.BOUNDED_MAILBOX), shardManagerId); - } catch (Exception e){ + } catch (Exception e) { lastException = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying (retry count = {})", - shardManagerId, e.getMessage(), i); + LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying " + + "(retry count = {})", shardManagerId, e.getMessage(), i); } }