From d3a97264ecf47e8c60ea11a7caebce41b580e91d Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 14 May 2015 12:40:48 -0400 Subject: [PATCH] Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes Added a PrimaryShardInfoFutureCache class which maintains the cache of PrimaryShardInfo Future instances per shard. This code was moved from the ActorContext. The PrimaryShardInfoFutureCache instance is created by the DistributedDataStore and passed to the ShardManager, as well as to the ActorContext. The ActorContext uses the cache as before. On ShardLeaderChanged message, if the leaderId has changed, the ShardManager calls remove on the PrimaryShardInfoFutureCache to invalidate the entry for the shard name. As part of this patch some refactoring was done which removed ShardInfoListeners. There are a few miscellaneous changes in this patch as well, - Create the appropriate type of transaction on the datatree for local transactions - Tracking only read-only transactions which phantom references Change-Id: I4c7a3845e311e5130d80f22f7cfb3c82c5ad731d Signed-off-by: Tom Pantelis (cherry picked from commit 7fbe47abd15ea486c69c08f20c4cb21324775c3f) --- .../AbstractTransactionContextFactory.java | 93 +++++++++++-------- .../datastore/DistributedDataStore.java | 16 +++- .../datastore/LocalTransactionChain.java | 12 +++ .../datastore/LocalTransactionContext.java | 42 +++++---- .../datastore/LocalTransactionFactory.java | 6 ++ .../LocalTransactionFactoryImpl.java | 12 +++ .../cluster/datastore/ShardManager.java | 33 +++++-- .../datastore/TransactionChainProxy.java | 6 -- .../datastore/TransactionContextFactory.java | 39 +------- .../cluster/datastore/utils/ActorContext.java | 61 ++---------- .../utils/PrimaryShardInfoFutureCache.java | 37 ++++++++ .../datastore/utils/ShardInfoListener.java | 27 ------ .../utils/ShardInfoListenerRegistration.java | 30 ------ .../AbstractTransactionProxyTest.java | 4 - .../DistributedDataStoreIntegrationTest.java | 82 +++++++++++++--- ...butedDataStoreRemotingIntegrationTest.java | 70 ++++++++++++-- .../cluster/datastore/IntegrationTestKit.java | 30 ++++-- .../LocalTransactionContextTest.java | 14 ++- .../cluster/datastore/ShardManagerTest.java | 57 +++++------- .../cluster/datastore/ShardTestKit.java | 21 +++++ .../datastore/utils/ActorContextTest.java | 24 ++--- .../PrimaryShardInfoFutureCacheTest.java | 44 +++++++++ 22 files changed, 458 insertions(+), 302 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index a8a076f98f..b50426a811 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -19,7 +19,9 @@ import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,8 +32,7 @@ import scala.util.Try; * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local * transaction factories. */ -abstract class AbstractTransactionContextFactory - implements ShardInfoListener, AutoCloseable { +abstract class AbstractTransactionContextFactory implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class); protected static final AtomicLong TX_COUNTER = new AtomicLong(); @@ -117,34 +118,16 @@ abstract class AbstractTransactionContextFactory maybeDataTree = primaryShardInfo.getLocalShardDataTree(); if (maybeDataTree.isPresent()) { - knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get())); - LOG.debug("Shard {} resolved to local data tree", shardName); - } - } - - @Override - public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) { - final F existing = knownLocal.get(shardName); - if (existing != null) { - if (primaryShardInfo != null) { - final Optional maybeDataTree = primaryShardInfo.getLocalShardDataTree(); - if (maybeDataTree.isPresent()) { - final DataTree newDataTree = maybeDataTree.get(); - final DataTree oldDataTree = dataTreeForFactory(existing); - if (!oldDataTree.equals(newDataTree)) { - final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree); - knownLocal.replace(shardName, existing, newChain); - LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree); - } + if(!knownLocal.containsKey(shardName)) { + LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName); - return; - } - } - if (knownLocal.remove(shardName, existing)) { - LOG.debug("Shard {} invalidated data tree {}", shardName, existing); - } else { - LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing); + F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()); + knownLocal.putIfAbsent(shardName, factory); } + } else if(knownLocal.containsKey(shardName)) { + LOG.debug("Shard {} invalidating local data tree", shardName); + + knownLocal.remove(shardName); } } @@ -184,14 +167,6 @@ abstract class AbstractTransactionContextFactory void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection> cohortFutures); private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) { - return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter()); + switch(parent.getType()) { + case READ_ONLY: + final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier()); + return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getCompleter()) { + @Override + protected DOMStoreWriteTransaction getWriteDelegate() { + throw new UnsupportedOperationException(); + } + + @Override + protected DOMStoreReadTransaction getReadDelegate() { + return readOnly; + } + }; + case READ_WRITE: + final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier()); + return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getCompleter()) { + @Override + protected DOMStoreWriteTransaction getWriteDelegate() { + return readWrite; + } + + @Override + protected DOMStoreReadTransaction getReadDelegate() { + return readWrite; + } + }; + case WRITE_ONLY: + final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier()); + return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getCompleter()) { + @Override + protected DOMStoreWriteTransaction getWriteDelegate() { + return writeOnly; + } + + @Override + protected DOMStoreReadTransaction getReadDelegate() { + throw new UnsupportedOperationException(); + } + }; + default: + throw new IllegalArgumentException("Invalid transaction type: " + parent.getType()); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 18266658d3..f655186920 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXB import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; @@ -81,8 +82,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, String shardDispatcher = new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); + PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration, - datastoreContext, shardDispatcher, shardManagerId ), cluster, configuration, datastoreContext); + datastoreContext, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster, + configuration, datastoreContext, primaryShardInfoCache); this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; @@ -97,7 +100,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, datastoreInfoMXBean.registerMBean(); } - public DistributedDataStore(ActorContext actorContext) { + @VisibleForTesting + DistributedDataStore(ActorContext actorContext) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.txContextFactory = TransactionContextFactory.create(actorContext); this.type = UNKNOWN_TYPE; @@ -218,14 +222,16 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, } private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId){ + DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId, + PrimaryShardInfoFutureCache primaryShardInfoCache){ Exception lastException = null; for(int i=0;i<100;i++) { try { return actorSystem.actorOf( - ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch) - .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId); + ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch, + primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox( + ActorContext.MAILBOX), shardManagerId); } catch (Exception e){ lastException = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java index 93e09db32a..39d0133d02 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java @@ -11,8 +11,10 @@ import akka.actor.ActorSelection; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -69,8 +71,18 @@ final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain }; } + @Override + public DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier) { + return super.newReadOnlyTransaction(identifier); + } + @Override public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) { return super.newReadWriteTransaction(identifier); } + + @Override + public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) { + return super.newWriteOnlyTransaction(identifier); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java index dd7c9194fd..e72c8a30f9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -14,7 +14,9 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import scala.concurrent.Future; @@ -25,38 +27,42 @@ import scala.concurrent.Future; * * @author Thomas Pantelis */ -final class LocalTransactionContext extends AbstractTransactionContext { - private final DOMStoreReadWriteTransaction delegate; +abstract class LocalTransactionContext extends AbstractTransactionContext { + + private final DOMStoreTransaction txDelegate; private final OperationCompleter completer; - LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate, OperationCompleter completer) { + LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationCompleter completer) { super(identifier); - this.delegate = Preconditions.checkNotNull(delegate); + this.txDelegate = Preconditions.checkNotNull(txDelegate); this.completer = Preconditions.checkNotNull(completer); } + protected abstract DOMStoreWriteTransaction getWriteDelegate(); + + protected abstract DOMStoreReadTransaction getReadDelegate(); + @Override - public void writeData(final YangInstanceIdentifier path, final NormalizedNode data) { - delegate.write(path, data); + public void writeData(YangInstanceIdentifier path, NormalizedNode data) { + getWriteDelegate().write(path, data); completer.onComplete(null, null); } @Override - public void mergeData(final YangInstanceIdentifier path, final NormalizedNode data) { - delegate.merge(path, data); + public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { + getWriteDelegate().merge(path, data); completer.onComplete(null, null); } @Override - public void deleteData(final YangInstanceIdentifier path) { - delegate.delete(path); + public void deleteData(YangInstanceIdentifier path) { + getWriteDelegate().delete(path); completer.onComplete(null, null); } @Override - public void readData(final YangInstanceIdentifier path, final SettableFuture>> proxyFuture) { - - Futures.addCallback(delegate.read(path), new FutureCallback>>() { + public void readData(YangInstanceIdentifier path, final SettableFuture>> proxyFuture) { + Futures.addCallback(getReadDelegate().read(path), new FutureCallback>>() { @Override public void onSuccess(Optional> result) { proxyFuture.set(result); @@ -72,8 +78,8 @@ final class LocalTransactionContext extends AbstractTransactionContext { } @Override - public void dataExists(final YangInstanceIdentifier path, final SettableFuture proxyFuture) { - Futures.addCallback(delegate.exists(path), new FutureCallback() { + public void dataExists(YangInstanceIdentifier path, final SettableFuture proxyFuture) { + Futures.addCallback(getReadDelegate().exists(path), new FutureCallback() { @Override public void onSuccess(Boolean result) { proxyFuture.set(result); @@ -89,7 +95,7 @@ final class LocalTransactionContext extends AbstractTransactionContext { } private LocalThreePhaseCommitCohort ready() { - LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) delegate.ready(); + LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready(); completer.onComplete(null, null); return ready; } @@ -111,6 +117,6 @@ final class LocalTransactionContext extends AbstractTransactionContext { @Override public void closeTransaction() { - delegate.close(); + txDelegate.close(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java index 8ca442498e..d574e83401 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java @@ -8,7 +8,9 @@ package org.opendaylight.controller.cluster.datastore; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; /** * A factory for creating local transactions used by {@link AbstractTransactionContextFactory} to instantiate @@ -17,5 +19,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio * @author Thomas Pantelis */ interface LocalTransactionFactory { + DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier); + DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier); + + DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier); } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java index dce9b5c55b..149b9370ec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java @@ -11,8 +11,10 @@ import akka.actor.ActorSelection; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions; import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype; @@ -44,11 +46,21 @@ final class LocalTransactionFactoryImpl extends TransactionReadyPrototype tx) { // No-op diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index f4fa7b3a97..63d4c9826b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -64,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShard import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; @@ -120,10 +121,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final CountDownLatch waitTillReadyCountdownLatch; + private final PrimaryShardInfoFutureCache primaryShardInfoCache; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, + PrimaryShardInfoFutureCache primaryShardInfoCache) { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); @@ -134,6 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + this.primaryShardInfoCache = primaryShardInfoCache; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -149,13 +154,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ClusterWrapper cluster, final Configuration configuration, final DatastoreContext datastoreContext, - final CountDownLatch waitTillReadyCountdownLatch) { + final CountDownLatch waitTillReadyCountdownLatch, + final PrimaryShardInfoFutureCache primaryShardInfoCache) { Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); + Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, + waitTillReadyCountdownLatch, primaryShardInfoCache)); } @Override @@ -212,7 +220,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); - shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); + if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { + primaryShardInfoCache.remove(shardInformation.getShardName()); + } + checkReady(); } else { LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); @@ -826,10 +837,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } - void setLeaderId(String leaderId) { + boolean setLeaderId(String leaderId) { + boolean changed = !Objects.equal(this.leaderId, leaderId); this.leaderId = leaderId; notifyOnShardInitializedCallbacks(); + + return changed; } } @@ -840,18 +854,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Configuration configuration; final DatastoreContext datastoreContext; private final CountDownLatch waitTillReadyCountdownLatch; + private final PrimaryShardInfoFutureCache primaryShardInfoCache; - ShardManagerCreator(ClusterWrapper cluster, - Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { + ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext, + CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) { this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + this.primaryShardInfoCache = primaryShardInfoCache; } @Override public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, + primaryShardInfoCache); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index b44f0b15b2..ff6471caa0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -153,7 +153,6 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory { - @GuardedBy("childChains") - private final Collection childChains = new ArrayList<>(); - - private final ShardInfoListenerRegistration reg; - private TransactionContextFactory(final ActorContext actorContext) { super(actorContext); - this.reg = actorContext.registerShardInfoListener(this); } static TransactionContextFactory create(final ActorContext actorContext) { @@ -41,7 +32,6 @@ final class TransactionContextFactory extends AbstractTransactionContextFactory< @Override public void close() { - reg.close(); } @Override @@ -65,33 +55,6 @@ final class TransactionContextFactory extends AbstractTransactionContextFactory< } DOMStoreTransactionChain createTransactionChain() { - final TransactionChainProxy ret = new TransactionChainProxy(this); - - synchronized (childChains) { - childChains.add(ret); - } - - return ret; - } - - void removeTransactionChain(final TransactionChainProxy chain) { - synchronized (childChains) { - childChains.remove(chain); - } - } - - @Override - public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) { - synchronized (childChains) { - for (TransactionChainProxy chain : childChains) { - chain.onShardInfoUpdated(shardName, primaryShardInfo); - } - super.onShardInfoUpdated(shardName, primaryShardInfo); - } - } - - @Override - protected DataTree dataTreeForFactory(final LocalTransactionFactoryImpl factory) { - return factory.getDataTree(); + return new TransactionChainProxy(this); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 5b4f54daf8..5c514cf775 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -15,25 +15,16 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.PoisonPill; -import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import java.util.ArrayList; -import java.util.Collection; import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; @@ -69,7 +60,7 @@ import scala.concurrent.duration.FiniteDuration; * easily. An ActorContext can be freely passed around to local object instances * but should not be passed to actors especially remote actors */ -public class ActorContext implements RemovalListener> { +public class ActorContext { private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; private static final String METRIC_RATE = "rate"; @@ -104,29 +95,29 @@ public class ActorContext implements RemovalListener> primaryShardInfoCache; private volatile SchemaContext schemaContext; private volatile boolean updated; private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry(); - @GuardedBy("shardInfoListeners") - private final Collection> shardInfoListeners = new ArrayList<>(); + + private final PrimaryShardInfoFutureCache primaryShardInfoCache; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { this(actorSystem, shardManager, clusterWrapper, configuration, - DatastoreContext.newBuilder().build()); + DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); } public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) { this.actorSystem = actorSystem; this.shardManager = shardManager; this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; this.dispatchers = new Dispatchers(actorSystem.dispatchers()); + this.primaryShardInfoCache = primaryShardInfoCache; setCachedProperties(); @@ -150,11 +141,6 @@ public class ActorContext implements RemovalListener reg : shardInfoListeners) { - reg.getInstance().onShardInfoUpdated(shardName, info); - } - } + primaryShardInfoCache.putSuccessful(shardName, info); return info; } @@ -567,32 +547,7 @@ public class ActorContext implements RemovalListener> getPrimaryShardInfoCache() { + public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() { return primaryShardInfoCache; } - - public ShardInfoListenerRegistration registerShardInfoListener(final T listener) { - final ShardInfoListenerRegistration reg = new ShardInfoListenerRegistration(listener, this); - - synchronized (shardInfoListeners) { - shardInfoListeners.add(reg); - } - return reg; - } - - protected void removeShardInfoListener(final ShardInfoListenerRegistration registration) { - synchronized (shardInfoListeners) { - shardInfoListeners.remove(registration); - } - } - - @Override - public void onRemoval(final RemovalNotification> notification) { - synchronized (shardInfoListeners) { - for (ShardInfoListenerRegistration reg : shardInfoListeners) { - reg.getInstance().onShardInfoUpdated(notification.getKey(), null); - } - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java new file mode 100644 index 0000000000..70d9b94a00 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.dispatch.Futures; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import scala.concurrent.Future; + +/** + * Maintains a cache of PrimaryShardInfo Future instances per shard. + * + * @author Thomas Pantelis + */ +public class PrimaryShardInfoFutureCache { + private final Cache> primaryShardInfoCache = CacheBuilder.newBuilder().build(); + + public @Nullable Future getIfPresent(@Nonnull String shardName) { + return primaryShardInfoCache.getIfPresent(shardName); + } + + public void putSuccessful(@Nonnull String shardName, @Nonnull PrimaryShardInfo info) { + primaryShardInfoCache.put(shardName, Futures.successful(info)); + } + + public void remove(@Nonnull String shardName) { + primaryShardInfoCache.invalidate(shardName); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java deleted file mode 100644 index 83c5d3716f..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.utils; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; - -/** - * Listener interface used to register for primary shard information changes. - * Implementations of this interface can be registered with {@link ActorContext} - * to receive notifications about shard information changes. - */ -public interface ShardInfoListener { - /** - * Update {@link PrimaryShardInfo} for a particular shard. - * @param shardName Shard name - * @param primaryShardInfo New {@link PrimaryShardInfo}, null if the information - * became unavailable. - */ - void onShardInfoUpdated(@Nonnull String shardName, @Nullable PrimaryShardInfo primaryShardInfo); -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java deleted file mode 100644 index 3dca66d49d..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.utils; - -import com.google.common.base.Preconditions; -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; - -/** - * Registration of a {@link ShardInfoListener} instance. - * - * @param Type of listener - */ -public class ShardInfoListenerRegistration extends AbstractObjectRegistration { - private final ActorContext parent; - - protected ShardInfoListenerRegistration(final T instance, final ActorContext parent) { - super(instance); - this.parent = Preconditions.checkNotNull(parent); - } - - @Override - protected void removeRegistration() { - parent.removeShardInfoListener(this); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 0b2d3ce108..27fe3c5869 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -66,8 +66,6 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; -import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener; -import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; @@ -154,8 +152,6 @@ public abstract class AbstractTransactionProxyTest { doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); - doReturn(mock(ShardInfoListenerRegistration.class)).when(mockActorContext).registerShardInfoListener( - any(ShardInfoListener.class)); mockComponentFactory = TransactionContextFactory.create(mockActorContext); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 76ae3c7156..5f21ee1cae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -8,13 +8,16 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -23,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; @@ -31,8 +35,15 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -237,6 +248,38 @@ public class DistributedDataStoreIntegrationTest { }}; } + @Test + public void testSingleTransactionsWritesInQuickSuccession() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testSingleTransactionsWritesInQuickSuccession", "cars-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + doCommit(writeTx.ready()); + + writeTx = txChain.newWriteOnlyTransaction(); + + int nCars = 5; + for(int i = 0; i < nCars; i++) { + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + } + + doCommit(writeTx.ready()); + + Optional> optional = txChain.newReadOnlyTransaction().read( + CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); + + cleanup(dataStore); + }}; + } + private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly) throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ @@ -637,7 +680,6 @@ public class DistributedDataStoreIntegrationTest { @Test public void testTransactionAbort() throws Exception{ - System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", "test-1"); @@ -819,29 +861,45 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionsInQuickSuccession", "test-1"); + "testCreateChainedTransactionsInQuickSuccession", "cars-1"); - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder().put( + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + List> futures = new ArrayList<>(); - int nTxs = 20; - List cohorts = new ArrayList<>(nTxs); - for(int i = 0; i < nTxs; i++) { - DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + futures.add(writeTx.submit()); - rwTx.merge(TestModel.TEST_PATH, testNode); + int nCars = 100; + for(int i = 0; i < nCars; i++) { + DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - cohorts.add(rwTx.ready()); + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + futures.add(rwTx.submit()); } - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - doCommit(cohort); + for(CheckedFuture f: futures) { + f.checkedGet(); } + Optional> optional = txChain.newReadOnlyTransaction().read( + LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); + txChain.close(); + broker.close(); + cleanup(dataStore); }}; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index be8ede9d6f..53b904ea36 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -55,6 +55,11 @@ public class DistributedDataStoreRemotingIntegrationTest { private static final String[] SHARD_NAMES = {"cars", "people"}; + private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559"); + + private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf"; + private ActorSystem leaderSystem; private ActorSystem followerSystem; @@ -62,7 +67,7 @@ public class DistributedDataStoreRemotingIntegrationTest { DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1); private final DatastoreContext.Builder followerDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(200); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); private DistributedDataStore followerDistributedDataStore; private DistributedDataStore leaderDistributedDataStore; @@ -72,11 +77,10 @@ public class DistributedDataStoreRemotingIntegrationTest { @Before public void setUpClass() { leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); - Cluster.get(leaderSystem).join(member1Address); + Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); - Cluster.get(followerSystem).join(member1Address); + Cluster.get(followerSystem).join(MEMBER_1_ADDRESS); } @After @@ -88,12 +92,10 @@ public class DistributedDataStoreRemotingIntegrationTest { private void initDatastores(String type) { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - String moduleShardsConfig = "module-shards-member1-and-2.conf"; - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES); + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES); - leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, true, SHARD_NAMES); + leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES); } @@ -355,6 +357,58 @@ public class DistributedDataStoreRemotingIntegrationTest { assertEquals("isPresent", false, optional.isPresent()); } + @Test + public void testSingleShardTransactionsWithLeaderChanges() throws Exception { + String testName = "testSingleShardTransactionsWithLeaderChanges"; + initDatastores(testName); + + String followerCarShardName = "member-2-shard-cars-" + testName; + InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class ); + + // Write top-level car container from the follower so it uses a remote Tx. + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + + followerTestKit.doCommit(writeTx.ready()); + + InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName); + + // Switch the leader to the follower + + followerDatastoreContextBuilder.shardElectionTimeoutFactor(1); + followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build()); + + JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + + followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES); + + leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS); + + DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder); + DistributedDataStore newMember1Datastore = newMember1TestKit. + setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES); + + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES); + + // Write a car entry to the new leader - should switch to local Tx + + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); + writeTx.merge(car1Path, car1); + + followerTestKit.doCommit(writeTx.ready()); + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1); + } + @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastores("testReadyLocalTransactionForwardedToLeader"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 94f20856ff..53e43f817d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -72,14 +72,7 @@ class IntegrationTestKit extends ShardTestKit { void waitUntilLeader(ActorContext actorContext, String... shardNames) { for(String shardName: shardNames) { - ActorRef shard = null; - for(int i = 0; i < 20 * 5 && shard == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - Optional shardReply = actorContext.findLocalShard(shardName); - if(shardReply.isPresent()) { - shard = shardReply.get(); - } - } + ActorRef shard = findLocalShard(actorContext, shardName); assertNotNull("Shard was not created", shard); @@ -87,6 +80,27 @@ class IntegrationTestKit extends ShardTestKit { } } + void waitUntilNoLeader(ActorContext actorContext, String... shardNames) { + for(String shardName: shardNames) { + ActorRef shard = findLocalShard(actorContext, shardName); + assertNotNull("No local shard found", shard); + + waitUntilNoLeader(shard); + } + } + + private ActorRef findLocalShard(ActorContext actorContext, String shardName) { + ActorRef shard = null; + for(int i = 0; i < 20 * 5 && shard == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + Optional shardReply = actorContext.findLocalShard(shardName); + if(shardReply.isPresent()) { + shard = shardReply.get(); + } + } + return shard; + } + void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath, NormalizedNode nodeToWrite) throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java index 7ca9f90a2c..7593eeadd3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -12,7 +12,9 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -32,7 +34,17 @@ public class LocalTransactionContextTest { @Before public void setUp(){ MockitoAnnotations.initMocks(this); - localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter)); + localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter)) { + @Override + protected DOMStoreWriteTransaction getWriteDelegate() { + return readWriteTransaction; + } + + @Override + protected DOMStoreReadTransaction getReadDelegate() { + return readWriteTransaction; + } + }; } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 645890dcb9..5f6973f5db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -60,6 +60,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; @@ -93,6 +94,8 @@ public class ShardManagerTest extends AbstractActorTest { return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); } + private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -112,9 +115,9 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); } - private Props newShardMgrProps() { + private Props newShardMgrProps(boolean persistent) { return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - datastoreContextBuilder.build(), ready); + datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache); } private Props newPropsShardMgrWithMockShardActor() { @@ -129,7 +132,7 @@ public class ShardManagerTest extends AbstractActorTest { @Override public ShardManager create() throws Exception { return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), - ready, name, shardActor); + ready, name, shardActor, primaryShardInfoCache); } }; @@ -529,7 +532,7 @@ public class ShardManagerTest extends AbstractActorTest { throws Exception { new JavaTestKit(getSystem()) {{ final TestActorRef shardManager = - TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef.create(getSystem(), newShardMgrProps(true)); assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size()); @@ -564,7 +567,7 @@ public class ShardManagerTest extends AbstractActorTest { throws Exception { new JavaTestKit(getSystem()) {{ final TestActorRef shardManager = - TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef.create(getSystem(), newShardMgrProps(true)); SchemaContext schemaContext = mock(SchemaContext.class); Set moduleIdentifierSet = new HashSet<>(); @@ -601,10 +604,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testRecoveryApplicable(){ new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + final Props persistentProps = newShardMgrProps(true); final TestActorRef persistentShardManager = TestActorRef.create(getSystem(), persistentProps); @@ -612,10 +612,7 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable()); - final Props nonPersistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(false).build(), ready); + final Props nonPersistentProps = newShardMgrProps(false); final TestActorRef nonPersistentShardManager = TestActorRef.create(getSystem(), nonPersistentProps); @@ -636,7 +633,8 @@ public class ShardManagerTest extends AbstractActorTest { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) { + return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), + ready, new PrimaryShardInfoFutureCache()) { @Override protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { DataPersistenceProviderMonitor dataPersistenceProviderMonitor @@ -674,7 +672,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( @@ -694,7 +692,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( @@ -716,7 +714,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( @@ -738,7 +736,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); @@ -751,10 +749,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testByDefaultSyncStatusIsFalse() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + final Props persistentProps = newShardMgrProps(true); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -768,7 +763,7 @@ public class ShardManagerTest extends AbstractActorTest { final Props persistentProps = ShardManager.props( new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -781,10 +776,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + final Props persistentProps = newShardMgrProps(true); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -805,7 +797,7 @@ public class ShardManagerTest extends AbstractActorTest { final Props persistentProps = ShardManager.props( new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -838,7 +830,7 @@ public class ShardManagerTest extends AbstractActorTest { return Arrays.asList("default", "astronauts"); } }, - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -881,7 +873,8 @@ public class ShardManagerTest extends AbstractActorTest { TestShardManager(String shardMrgIDSuffix) { super(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready); + DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready, + new PrimaryShardInfoFutureCache()); } @Override @@ -939,8 +932,8 @@ public class ShardManagerTest extends AbstractActorTest { protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, - ActorRef shardActor) { - super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) { + super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache); this.shardActor = shardActor; this.name = name; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java index 2d1b14c269..7887290707 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java @@ -68,4 +68,25 @@ public class ShardTestKit extends JavaTestKit { Assert.fail("Leader not found for shard " + shard.path()); } + protected void waitUntilNoLeader(ActorRef shard) { + FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); + for(int i = 0; i < 20 * 5; i++) { + Future future = Patterns.ask(shard, new FindLeader(), new Timeout(duration)); + try { + FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration); + if(resp.getLeaderActor() == null) { + return; + } + } catch(TimeoutException e) { + } catch(Exception e) { + System.err.println("FindLeader threw ex"); + e.printStackTrace(); + } + + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Unexpected leader found for shard " + shard.path()); + } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 377c05bf8c..4ee89ca76d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -331,7 +331,7 @@ public class ActorContextTest extends AbstractActorTest{ public void testClientDispatcherIsGlobalDispatcher(){ ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), DatastoreContext.newBuilder().build()); + mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -343,7 +343,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), DatastoreContext.newBuilder().build()); + mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -356,7 +356,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder(). - operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build()); + operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache()); assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); assertEquals("getTransactionCommitOperationTimeout", 7, @@ -389,7 +389,7 @@ public class ActorContextTest extends AbstractActorTest{ final String expPrimaryPath = "akka://test-system/find-primary-shard"; ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext) { + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath)); @@ -410,9 +410,7 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(cachedInfo, actual); - // Wait for 200 Milliseconds. The cached entry should have been removed. - - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + actorContext.getPrimaryShardInfoCache().remove("foobar"); cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); @@ -432,7 +430,7 @@ public class ActorContextTest extends AbstractActorTest{ final String expPrimaryPath = "akka://test-system/find-primary-shard"; ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext) { + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree)); @@ -454,9 +452,7 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(cachedInfo, actual); - // Wait for 200 Milliseconds. The cached entry should have been removed. - - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + actorContext.getPrimaryShardInfoCache().remove("foobar"); cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); @@ -474,7 +470,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext) { + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new PrimaryNotFoundException("not found")); @@ -507,7 +503,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext) { + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new NotInitializedException("not iniislized")); @@ -547,7 +543,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class), mockConfig, - DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build()); + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache()); actorContext.broadcast(new TestMessage()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java new file mode 100644 index 0000000000..9d34ce5a08 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import akka.actor.ActorSelection; +import com.google.common.base.Optional; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import scala.concurrent.Future; + +/** + * Unit tests for PrimaryShardInfoFutureCache. + * + * @author Thomas Pantelis + */ +public class PrimaryShardInfoFutureCacheTest { + + @Test + public void testOperations() { + PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache(); + + assertEquals("getIfPresent", null, cache.getIfPresent("foo")); + + PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), Optional.absent()); + cache.putSuccessful("foo", shardInfo); + + Future future = cache.getIfPresent("foo"); + assertNotNull("Null future", future); + assertEquals("getIfPresent", shardInfo, future.value().get().get()); + + cache.remove("foo"); + + assertEquals("getIfPresent", null, cache.getIfPresent("foo")); + } +} -- 2.36.6