X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStore.java;h=18266658d3de421f592a5892b06b7a8baeec07f5;hb=2f7c93174d7834a4c4aedacc9b88aa53a5a0422c;hp=c79de945675a0f14d8a40fd6dc13f4007a3a9669;hpb=52da008f461eb10786758eb45af16c1327d5b974;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index c79de94567..18266658d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -8,9 +8,11 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -21,10 +23,12 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -38,7 +42,7 @@ import org.slf4j.LoggerFactory; * */ public class DistributedDataStore implements DOMStore, SchemaContextListener, - DatastoreContextConfigAdminOverlay.Listener, AutoCloseable { + DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private static final String UNKNOWN_TYPE = "unknown"; @@ -59,6 +63,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, private final String type; + private final TransactionContextFactory txContextFactory; + public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); @@ -75,14 +81,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, String shardDispatcher = new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - actorContext = new ActorContext(actorSystem, actorSystem.actorOf( - ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch) - .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ), - cluster, configuration, datastoreContext); + actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration, + datastoreContext, shardDispatcher, shardManagerId ), cluster, configuration, datastoreContext); this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; + this.txContextFactory = TransactionContextFactory.create(actorContext); datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType()); datastoreConfigMXBean.setContext(datastoreContext); @@ -94,10 +99,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, public DistributedDataStore(ActorContext actorContext) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); + this.txContextFactory = TransactionContextFactory.create(actorContext); this.type = UNKNOWN_TYPE; this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; - } public void setCloseable(AutoCloseable closeable) { @@ -125,26 +130,41 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, return listenerRegistrationProxy; } + @Override + public ListenerRegistration registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) { + Preconditions.checkNotNull(treeId, "treeId should not be null"); + Preconditions.checkNotNull(listener, "listener should not be null"); + + final String shardName = ShardStrategyFactory.getStrategy(treeId).findShard(treeId); + LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); + + final DataTreeChangeListenerProxy listenerRegistrationProxy = + new DataTreeChangeListenerProxy(actorContext, listener); + listenerRegistrationProxy.init(shardName, treeId); + + return listenerRegistrationProxy; + } + @Override public DOMStoreTransactionChain createTransactionChain() { - return new TransactionChainProxy(actorContext); + return txContextFactory.createTransactionChain(); } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); + return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { actorContext.acquireTxCreationPermit(); - return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); + return new TransactionProxy(txContextFactory, TransactionType.WRITE_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { actorContext.acquireTxCreationPermit(); - return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); + return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE); } @Override @@ -165,15 +185,17 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, datastoreConfigMXBean.unregisterMBean(); datastoreInfoMXBean.unregisterMBean(); - if(closeable != null) { + if (closeable != null) { try { closeable.close(); } catch (Exception e) { - LOG.debug("Error closing insance", e); + LOG.debug("Error closing instance", e); } } + txContextFactory.close(); actorContext.shutdown(); + DistributedDataStoreFactory.destroyInstance(this); } @VisibleForTesting @@ -195,6 +217,25 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, } } + private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId){ + Exception lastException = null; + + for(int i=0;i<100;i++) { + try { + return actorSystem.actorOf( + ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch) + .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId); + } catch (Exception e){ + lastException = e; + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + LOG.debug(String.format("Could not create actor %s because of %s - waiting for sometime before retrying (retry count = %d)", shardManagerId, e.getMessage(), i)); + } + } + + throw new IllegalStateException("Failed to create Shard Manager", lastException); + } + @VisibleForTesting public CountDownLatch getWaitTillReadyCountDownLatch() { return waitTillReadyCountDownLatch;