Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes 48/21148/2
authorTom Pantelis <tpanteli@brocade.com>
Thu, 14 May 2015 16:40:48 +0000 (12:40 -0400)
committerMoiz Raja <moraja@cisco.com>
Tue, 26 May 2015 16:26:40 +0000 (16:26 +0000)
Added a PrimaryShardInfoFutureCache class which maintains the cache of
PrimaryShardInfo Future instances per shard. This code was moved from the ActorContext.

The PrimaryShardInfoFutureCache instance is created by the
DistributedDataStore and passed to the ShardManager, as well as to the
ActorContext. The ActorContext uses the cache as before.

On ShardLeaderChanged message, if the leaderId has changed, the
ShardManager calls remove on the PrimaryShardInfoFutureCache to
invalidate the entry for the shard name.

As part of this patch some refactoring was done which removed ShardInfoListeners.

There are a few miscellaneous changes in this patch as well,
- Create the appropriate type of transaction on the datatree for local transactions
- Tracking only read-only transactions which phantom references

Change-Id: I4c7a3845e311e5130d80f22f7cfb3c82c5ad731d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 7fbe47abd15ea486c69c08f20c4cb21324775c3f)

22 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java [new file with mode: 0644]

index a8a076f98fcee0c73919d8187ddb64ec2b88718b..b50426a811e3f2fd677f187ba459a2c09653c34c 100644 (file)
@@ -19,7 +19,9 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,8 +32,7 @@ import scala.util.Try;
  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
  * transaction factories.
  */
-abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
-        implements ShardInfoListener, AutoCloseable {
+abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
 
     protected static final AtomicLong TX_COUNTER = new AtomicLong();
@@ -117,34 +118,16 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
         if (maybeDataTree.isPresent()) {
-            knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
-            LOG.debug("Shard {} resolved to local data tree", shardName);
-        }
-    }
-
-    @Override
-    public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
-        final F existing = knownLocal.get(shardName);
-        if (existing != null) {
-            if (primaryShardInfo != null) {
-                final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
-                if (maybeDataTree.isPresent()) {
-                    final DataTree newDataTree = maybeDataTree.get();
-                    final DataTree oldDataTree = dataTreeForFactory(existing);
-                    if (!oldDataTree.equals(newDataTree)) {
-                        final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
-                        knownLocal.replace(shardName, existing, newChain);
-                        LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
-                    }
+            if(!knownLocal.containsKey(shardName)) {
+                LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
 
-                    return;
-                }
-            }
-            if (knownLocal.remove(shardName, existing)) {
-                LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
-            } else {
-                LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
+                F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
+                knownLocal.putIfAbsent(shardName, factory);
             }
+        } else if(knownLocal.containsKey(shardName)) {
+            LOG.debug("Shard {} invalidating local data tree", shardName);
+
+            knownLocal.remove(shardName);
         }
     }
 
@@ -184,14 +167,6 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      */
     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
 
-    /**
-     * Extract the backing data tree from a particular factory.
-     *
-     * @param factory Transaction factory
-     * @return Backing data tree
-     */
-    protected abstract DataTree dataTreeForFactory(F factory);
-
     /**
      * Callback invoked from child transactions to push any futures, which need to
      * be waited for before the next transaction is allocated.
@@ -200,6 +175,48 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
 
     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
-        return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter());
+        switch(parent.getType()) {
+            case READ_ONLY:
+                final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getCompleter()) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        return readOnly;
+                    }
+                };
+            case READ_WRITE:
+                final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getCompleter()) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        return readWrite;
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        return readWrite;
+                    }
+                };
+            case WRITE_ONLY:
+                final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getCompleter()) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        return writeOnly;
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+             default:
+                 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+        }
     }
 }
index 18266658d3de421f592a5892b06b7a8baeec07f5..f65518692039be488dde962fb5a5d6f168ed88de 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXB
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
@@ -81,8 +82,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         String shardDispatcher =
                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
 
+        PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
         actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
-                datastoreContext, shardDispatcher, shardManagerId ), cluster, configuration, datastoreContext);
+                datastoreContext, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
+                configuration, datastoreContext, primaryShardInfoCache);
 
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
@@ -97,7 +100,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         datastoreInfoMXBean.registerMBean();
     }
 
-    public DistributedDataStore(ActorContext actorContext) {
+    @VisibleForTesting
+    DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
         this.txContextFactory = TransactionContextFactory.create(actorContext);
         this.type = UNKNOWN_TYPE;
@@ -218,14 +222,16 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     }
 
     private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
-                                        DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId){
+                                        DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId,
+                                        PrimaryShardInfoFutureCache primaryShardInfoCache){
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {
             try {
                 return actorSystem.actorOf(
-                        ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
-                                .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId);
+                        ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch,
+                                primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
+                                        ActorContext.MAILBOX), shardManagerId);
             } catch (Exception e){
                 lastException = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
index 93e09db32a90a05bddbb3d08bbeebb7ab78057e3..39d0133d02f5f71eab1d509079f7fd0c688ba903 100644 (file)
@@ -11,8 +11,10 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@@ -69,8 +71,18 @@ final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain
         };
     }
 
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier) {
+        return super.newReadOnlyTransaction(identifier);
+    }
+
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) {
         return super.newReadWriteTransaction(identifier);
     }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) {
+        return super.newWriteOnlyTransaction(identifier);
+    }
 }
index dd7c9194fde2cdd95b7f72be041c977ac7ad9739..e72c8a30f943a8c28209d0f9453862dfa551e47d 100644 (file)
@@ -14,7 +14,9 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -25,38 +27,42 @@ import scala.concurrent.Future;
  *
  * @author Thomas Pantelis
  */
-final class LocalTransactionContext extends AbstractTransactionContext {
-    private final DOMStoreReadWriteTransaction delegate;
+abstract class LocalTransactionContext extends AbstractTransactionContext {
+
+    private final DOMStoreTransaction txDelegate;
     private final OperationCompleter completer;
 
-    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate, OperationCompleter completer) {
+    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationCompleter completer) {
         super(identifier);
-        this.delegate = Preconditions.checkNotNull(delegate);
+        this.txDelegate = Preconditions.checkNotNull(txDelegate);
         this.completer = Preconditions.checkNotNull(completer);
     }
 
+    protected abstract DOMStoreWriteTransaction getWriteDelegate();
+
+    protected abstract DOMStoreReadTransaction getReadDelegate();
+
     @Override
-    public void writeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        delegate.write(path, data);
+    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        getWriteDelegate().write(path, data);
         completer.onComplete(null, null);
     }
 
     @Override
-    public void mergeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        delegate.merge(path, data);
+    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        getWriteDelegate().merge(path, data);
         completer.onComplete(null, null);
     }
 
     @Override
-    public void deleteData(final YangInstanceIdentifier path) {
-        delegate.delete(path);
+    public void deleteData(YangInstanceIdentifier path) {
+        getWriteDelegate().delete(path);
         completer.onComplete(null, null);
     }
 
     @Override
-    public void readData(final YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
-
-        Futures.addCallback(delegate.read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+    public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+        Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
             @Override
             public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
                 proxyFuture.set(result);
@@ -72,8 +78,8 @@ final class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
-        Futures.addCallback(delegate.exists(path), new FutureCallback<Boolean>() {
+    public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
+        Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
                 proxyFuture.set(result);
@@ -89,7 +95,7 @@ final class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     private LocalThreePhaseCommitCohort ready() {
-        LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) delegate.ready();
+        LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
         completer.onComplete(null, null);
         return ready;
     }
@@ -111,6 +117,6 @@ final class LocalTransactionContext extends AbstractTransactionContext {
 
     @Override
     public void closeTransaction() {
-        delegate.close();
+        txDelegate.close();
     }
 }
index 8ca442498e73f5d3f0b42e3c36a2efb9a3f1f7ec..d574e83401d31ffed21891b0852489e562fcd727 100644 (file)
@@ -8,7 +8,9 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
 /**
  * A factory for creating local transactions used by {@link AbstractTransactionContextFactory} to instantiate
@@ -17,5 +19,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
  * @author Thomas Pantelis
  */
 interface LocalTransactionFactory {
+    DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier);
+
     DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier);
+
+    DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier);
 }
\ No newline at end of file
index dce9b5c55b1ccd7ec50635c176a6bbf1a11706ff..149b9370ecba75bd264e2f50c8d5cdfd89a4b39a 100644 (file)
@@ -11,8 +11,10 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
@@ -44,11 +46,21 @@ final class LocalTransactionFactoryImpl extends TransactionReadyPrototype<Transa
         return dataTree;
     }
 
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier) {
+        return SnapshotBackedTransactions.newReadTransaction(identifier, false, dataTree.takeSnapshot());
+    }
+
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) {
         return SnapshotBackedTransactions.newReadWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
     }
 
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) {
+        return SnapshotBackedTransactions.newWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
+    }
+
     @Override
     protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx) {
         // No-op
index f4fa7b3a97e8617f8ac474b494a08deb330a0b6d..63d4c9826b0835de095103c3c463c1d811e5ecae 100644 (file)
@@ -64,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShard
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -120,10 +121,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final CountDownLatch waitTillReadyCountdownLatch;
 
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+            PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
@@ -134,6 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = primaryShardInfoCache;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -149,13 +154,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ClusterWrapper cluster,
         final Configuration configuration,
         final DatastoreContext datastoreContext,
-        final CountDownLatch waitTillReadyCountdownLatch) {
+        final CountDownLatch waitTillReadyCountdownLatch,
+        final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+                waitTillReadyCountdownLatch, primaryShardInfoCache));
     }
 
     @Override
@@ -212,7 +220,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
-            shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+                primaryShardInfoCache.remove(shardInformation.getShardName());
+            }
+
             checkReady();
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
@@ -826,10 +837,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return false;
         }
 
-        void setLeaderId(String leaderId) {
+        boolean setLeaderId(String leaderId) {
+            boolean changed = !Objects.equal(this.leaderId, leaderId);
             this.leaderId = leaderId;
 
             notifyOnShardInitializedCallbacks();
+
+            return changed;
         }
     }
 
@@ -840,18 +854,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Configuration configuration;
         final DatastoreContext datastoreContext;
         private final CountDownLatch waitTillReadyCountdownLatch;
+        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-        ShardManagerCreator(ClusterWrapper cluster,
-                            Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
+                CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            this.primaryShardInfoCache = primaryShardInfoCache;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+                    primaryShardInfoCache);
         }
     }
 
index b44f0b15b29fb33620c14c4b1d38c8ba3d3a906d..ff6471caa0637ebfc21b7fc6947909a02c63bbe4 100644 (file)
@@ -153,7 +153,6 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory<Loca
 
         // Send a close transaction chain request to each and every shard
         getActorContext().broadcast(new CloseTransactionChain(transactionChainId).toSerializable());
-        parent.removeTransactionChain(this);
     }
 
     private TransactionProxy allocateWriteTransaction(final TransactionType type) {
@@ -172,11 +171,6 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory<Loca
         return ret;
     }
 
-    @Override
-    protected DataTree dataTreeForFactory(final LocalTransactionChain factory) {
-        return factory.getDataTree();
-    }
-
     /**
      * This method is overridden to ensure the previous Tx's ready operations complete
      * before we initiate the next Tx in the chain to avoid creation failures if the
index 8d7ca990dd48f3605854a3ea00f67dbf6c85cd3d..49f2ea85ff3221c4357b9ef20344bffa9a27f6ec 100644 (file)
@@ -8,13 +8,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import java.util.ArrayList;
 import java.util.Collection;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import scala.concurrent.Future;
@@ -25,14 +22,8 @@ import scala.concurrent.Future;
  */
 final class TransactionContextFactory extends AbstractTransactionContextFactory<LocalTransactionFactoryImpl> {
 
-    @GuardedBy("childChains")
-    private final Collection<TransactionChainProxy> childChains = new ArrayList<>();
-
-    private final ShardInfoListenerRegistration<TransactionContextFactory> reg;
-
     private TransactionContextFactory(final ActorContext actorContext) {
         super(actorContext);
-        this.reg = actorContext.registerShardInfoListener(this);
     }
 
     static TransactionContextFactory create(final ActorContext actorContext) {
@@ -41,7 +32,6 @@ final class TransactionContextFactory extends AbstractTransactionContextFactory<
 
     @Override
     public void close() {
-        reg.close();
     }
 
     @Override
@@ -65,33 +55,6 @@ final class TransactionContextFactory extends AbstractTransactionContextFactory<
     }
 
     DOMStoreTransactionChain createTransactionChain() {
-        final TransactionChainProxy ret = new TransactionChainProxy(this);
-
-        synchronized (childChains) {
-            childChains.add(ret);
-        }
-
-        return ret;
-    }
-
-    void removeTransactionChain(final TransactionChainProxy chain) {
-        synchronized (childChains) {
-            childChains.remove(chain);
-        }
-    }
-
-    @Override
-    public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
-        synchronized (childChains) {
-            for (TransactionChainProxy chain : childChains) {
-                chain.onShardInfoUpdated(shardName, primaryShardInfo);
-            }
-            super.onShardInfoUpdated(shardName, primaryShardInfo);
-        }
-    }
-
-    @Override
-    protected DataTree dataTreeForFactory(final LocalTransactionFactoryImpl factory) {
-        return factory.getDataTree();
+        return new TransactionChainProxy(this);
     }
 }
index 5b4f54daf8ea9b56f43e1afa1eb3f38dc9b5c1fd..5c514cf77502587fc15939df20e61187d0dfe297 100644 (file)
@@ -15,25 +15,16 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.PoisonPill;
-import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
@@ -69,7 +60,7 @@ import scala.concurrent.duration.FiniteDuration;
  * easily. An ActorContext can be freely passed around to local object instances
  * but should not be passed to actors especially remote actors
  */
-public class ActorContext implements RemovalListener<String, Future<PrimaryShardInfo>> {
+public class ActorContext {
     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
     private static final String METRIC_RATE = "rate";
@@ -104,29 +95,29 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
-    private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
-    @GuardedBy("shardInfoListeners")
-    private final Collection<ShardInfoListenerRegistration<?>> shardInfoListeners = new ArrayList<>();
+
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
         this(actorSystem, shardManager, clusterWrapper, configuration,
-                DatastoreContext.newBuilder().build());
+                DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
     }
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration,
-            DatastoreContext datastoreContext) {
+            DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
         this.actorSystem = actorSystem;
         this.shardManager = shardManager;
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
+        this.primaryShardInfoCache = primaryShardInfoCache;
 
         setCachedProperties();
 
@@ -150,11 +141,6 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
 
         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
-
-        primaryShardInfoCache = CacheBuilder.newBuilder()
-                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
-                .removalListener(this)
-                .build();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -243,13 +229,7 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
             DataTree localShardDataTree) {
         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
-        primaryShardInfoCache.put(shardName, Futures.successful(info));
-
-        synchronized (shardInfoListeners) {
-            for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
-                reg.getInstance().onShardInfoUpdated(shardName, info);
-            }
-        }
+        primaryShardInfoCache.putSuccessful(shardName, info);
         return info;
     }
 
@@ -567,32 +547,7 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
         return ask(actorRef, message, timeout);
     }
 
-    @VisibleForTesting
-    Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+    public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
         return primaryShardInfoCache;
     }
-
-    public <T extends ShardInfoListener> ShardInfoListenerRegistration<T> registerShardInfoListener(final T listener) {
-        final ShardInfoListenerRegistration<T> reg = new ShardInfoListenerRegistration<T>(listener, this);
-
-        synchronized (shardInfoListeners) {
-            shardInfoListeners.add(reg);
-        }
-        return reg;
-    }
-
-    protected void removeShardInfoListener(final ShardInfoListenerRegistration<?> registration) {
-        synchronized (shardInfoListeners) {
-            shardInfoListeners.remove(registration);
-        }
-    }
-
-    @Override
-    public void onRemoval(final RemovalNotification<String, Future<PrimaryShardInfo>> notification) {
-        synchronized (shardInfoListeners) {
-            for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
-                reg.getInstance().onShardInfoUpdated(notification.getKey(), null);
-            }
-        }
-    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java
new file mode 100644 (file)
index 0000000..70d9b94
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.Futures;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import scala.concurrent.Future;
+
+/**
+ * Maintains a cache of PrimaryShardInfo Future instances per shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfoFutureCache {
+    private final Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache = CacheBuilder.newBuilder().build();
+
+    public @Nullable Future<PrimaryShardInfo> getIfPresent(@Nonnull String shardName) {
+        return primaryShardInfoCache.getIfPresent(shardName);
+    }
+
+    public void putSuccessful(@Nonnull String shardName, @Nonnull PrimaryShardInfo info) {
+        primaryShardInfoCache.put(shardName, Futures.successful(info));
+    }
+
+    public void remove(@Nonnull String shardName) {
+        primaryShardInfoCache.invalidate(shardName);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java
deleted file mode 100644 (file)
index 83c5d37..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.utils;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-
-/**
- * Listener interface used to register for primary shard information changes.
- * Implementations of this interface can be registered with {@link ActorContext}
- * to receive notifications about shard information changes.
- */
-public interface ShardInfoListener {
-    /**
-     * Update {@link PrimaryShardInfo} for a particular shard.
-     * @param shardName Shard name
-     * @param primaryShardInfo New {@link PrimaryShardInfo}, null if the information
-     *                         became unavailable.
-     */
-    void onShardInfoUpdated(@Nonnull String shardName, @Nullable PrimaryShardInfo primaryShardInfo);
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java
deleted file mode 100644 (file)
index 3dca66d..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.utils;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-
-/**
- * Registration of a {@link ShardInfoListener} instance.
- *
- * @param <T> Type of listener
- */
-public class ShardInfoListenerRegistration<T extends ShardInfoListener> extends AbstractObjectRegistration<T> {
-    private final ActorContext parent;
-
-    protected ShardInfoListenerRegistration(final T instance, final ActorContext parent) {
-        super(instance);
-        this.parent = Preconditions.checkNotNull(parent);
-    }
-
-    @Override
-    protected void removeRegistration() {
-        parent.removeShardInfoListener(this);
-    }
-}
index 0b2d3ce108282354d25762727f9f76781bd3ca43..27fe3c5869342b3f8c9f39a792c0d318aa3f02c4 100644 (file)
@@ -66,8 +66,6 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
@@ -154,8 +152,6 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-        doReturn(mock(ShardInfoListenerRegistration.class)).when(mockActorContext).registerShardInfoListener(
-                any(ShardInfoListener.class));
 
         mockComponentFactory = TransactionContextFactory.create(mockActorContext);
 
index 76ae3c71566bdce5663918e05a7f1d6cf54e352b..5f21ee1caebc2cdf090349289b401bb8fa303905 100644 (file)
@@ -8,13 +8,16 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -23,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
@@ -31,8 +35,15 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -237,6 +248,38 @@ public class DistributedDataStoreIntegrationTest {
         }};
     }
 
+    @Test
+    public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            DistributedDataStore dataStore = setupDistributedDataStore(
+                    "testSingleTransactionsWritesInQuickSuccession", "cars-1");
+
+            DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+            DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+            writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            doCommit(writeTx.ready());
+
+            writeTx = txChain.newWriteOnlyTransaction();
+
+            int nCars = 5;
+            for(int i = 0; i < nCars; i++) {
+                writeTx.write(CarsModel.newCarPath("car" + i),
+                        CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+            }
+
+            doCommit(writeTx.ready());
+
+            Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
+                    CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
+
+            cleanup(dataStore);
+        }};
+    }
+
     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
             final boolean writeOnly) throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
@@ -637,7 +680,6 @@ public class DistributedDataStoreIntegrationTest {
 
     @Test
     public void testTransactionAbort() throws Exception{
-        System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore =
                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
@@ -819,29 +861,45 @@ public class DistributedDataStoreIntegrationTest {
     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore = setupDistributedDataStore(
-                    "testCreateChainedTransactionsInQuickSuccession", "test-1");
+                    "testCreateChainedTransactionsInQuickSuccession", "cars-1");
 
-            DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+            ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+                    ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+                            LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
 
-            NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+            DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+            List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
 
-            int nTxs = 20;
-            List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
-            for(int i = 0; i < nTxs; i++) {
-                DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+            DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
+            writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            futures.add(writeTx.submit());
 
-                rwTx.merge(TestModel.TEST_PATH, testNode);
+            int nCars = 100;
+            for(int i = 0; i < nCars; i++) {
+                DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
 
-                cohorts.add(rwTx.ready());
+                rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
+                        CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
 
+                futures.add(rwTx.submit());
             }
 
-            for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-                doCommit(cohort);
+            for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
+                f.checkedGet();
             }
 
+            Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
+                    LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
+
             txChain.close();
 
+            broker.close();
+
             cleanup(dataStore);
         }};
     }
index be8ede9d6f4a893dc3c2851bc83557dfb4867f44..53b904ea36f103a2fad0d3af9f80e7a00d0ab57a 100644 (file)
@@ -55,6 +55,11 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     private static final String[] SHARD_NAMES = {"cars", "people"};
 
+    private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+    private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
+
+    private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf";
+
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
 
@@ -62,7 +67,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
 
     private final DatastoreContext.Builder followerDatastoreContextBuilder =
-            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(200);
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
 
     private DistributedDataStore followerDistributedDataStore;
     private DistributedDataStore leaderDistributedDataStore;
@@ -72,11 +77,10 @@ public class DistributedDataStoreRemotingIntegrationTest {
     @Before
     public void setUpClass() {
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
-        Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
-        Cluster.get(leaderSystem).join(member1Address);
+        Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
 
         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
-        Cluster.get(followerSystem).join(member1Address);
+        Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
     }
 
     @After
@@ -88,12 +92,10 @@ public class DistributedDataStoreRemotingIntegrationTest {
     private void initDatastores(String type) {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
-        String moduleShardsConfig = "module-shards-member1-and-2.conf";
-
         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
-        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
+        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
 
-        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, true, SHARD_NAMES);
+        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
     }
@@ -355,6 +357,58 @@ public class DistributedDataStoreRemotingIntegrationTest {
         assertEquals("isPresent", false, optional.isPresent());
     }
 
+    @Test
+    public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
+        String testName = "testSingleShardTransactionsWithLeaderChanges";
+        initDatastores(testName);
+
+        String followerCarShardName = "member-2-shard-cars-" + testName;
+        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
+
+        // Write top-level car container from the follower so it uses a remote Tx.
+
+        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+
+        // Switch the leader to the follower
+
+        followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
+        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+        followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
+
+        leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
+
+        DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
+                shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+        IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
+        DistributedDataStore newMember1Datastore = newMember1TestKit.
+                    setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
+
+        // Write a car entry to the new leader - should switch to local Tx
+
+        writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
+        writeTx.merge(car1Path, car1);
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
+    }
+
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastores("testReadyLocalTransactionForwardedToLeader");
index 94f20856ff43754dae02986b554259c9e637c212..53e43f817d2f996b358c5756ac3fca1147bb2604 100644 (file)
@@ -72,14 +72,7 @@ class IntegrationTestKit extends ShardTestKit {
 
     void waitUntilLeader(ActorContext actorContext, String... shardNames) {
         for(String shardName: shardNames) {
-            ActorRef shard = null;
-            for(int i = 0; i < 20 * 5 && shard == null; i++) {
-                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-                Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
-                if(shardReply.isPresent()) {
-                    shard = shardReply.get();
-                }
-            }
+            ActorRef shard = findLocalShard(actorContext, shardName);
 
             assertNotNull("Shard was not created", shard);
 
@@ -87,6 +80,27 @@ class IntegrationTestKit extends ShardTestKit {
         }
     }
 
+    void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
+        for(String shardName: shardNames) {
+            ActorRef shard = findLocalShard(actorContext, shardName);
+            assertNotNull("No local shard found", shard);
+
+            waitUntilNoLeader(shard);
+        }
+    }
+
+    private ActorRef findLocalShard(ActorContext actorContext, String shardName) {
+        ActorRef shard = null;
+        for(int i = 0; i < 20 * 5 && shard == null; i++) {
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
+            if(shardReply.isPresent()) {
+                shard = shardReply.get();
+            }
+        }
+        return shard;
+    }
+
     void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
             NormalizedNode<?, ?> nodeToWrite) throws Exception {
 
index 7ca9f90a2c70b9720f9623f2f5c2cfe9d11df60e..7593eeadd383144f0c02001004f61608562a839f 100644 (file)
@@ -12,7 +12,9 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -32,7 +34,17 @@ public class LocalTransactionContextTest {
     @Before
     public void setUp(){
         MockitoAnnotations.initMocks(this);
-        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter));
+        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter)) {
+            @Override
+            protected DOMStoreWriteTransaction getWriteDelegate() {
+                return readWriteTransaction;
+            }
+
+            @Override
+            protected DOMStoreReadTransaction getReadDelegate() {
+                return readWriteTransaction;
+            }
+        };
     }
 
     @Test
index 645890dcb9c0f058bddbca5d06c4bd6f6bdcabe7..5f6973f5dbff3cb23d798c6740c1ff6aeae77850 100644 (file)
@@ -60,6 +60,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
@@ -93,6 +94,8 @@ public class ShardManagerTest extends AbstractActorTest {
         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
     }
 
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
@@ -112,9 +115,9 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.clear();
     }
 
-    private Props newShardMgrProps() {
+    private Props newShardMgrProps(boolean persistent) {
         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                datastoreContextBuilder.build(), ready);
+                datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
@@ -129,7 +132,7 @@ public class ShardManagerTest extends AbstractActorTest {
             @Override
             public ShardManager create() throws Exception {
                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
-                        ready, name, shardActor);
+                        ready, name, shardActor, primaryShardInfoCache);
             }
         };
 
@@ -529,7 +532,7 @@ public class ShardManagerTest extends AbstractActorTest {
             throws Exception {
         new JavaTestKit(getSystem()) {{
             final TestActorRef<ShardManager> shardManager =
-                    TestActorRef.create(getSystem(), newShardMgrProps());
+                    TestActorRef.create(getSystem(), newShardMgrProps(true));
 
             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
 
@@ -564,7 +567,7 @@ public class ShardManagerTest extends AbstractActorTest {
             throws Exception {
         new JavaTestKit(getSystem()) {{
             final TestActorRef<ShardManager> shardManager =
-                    TestActorRef.create(getSystem(), newShardMgrProps());
+                    TestActorRef.create(getSystem(), newShardMgrProps(true));
 
             SchemaContext schemaContext = mock(SchemaContext.class);
             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
@@ -601,10 +604,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRecoveryApplicable(){
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build(), ready);
+                final Props persistentProps = newShardMgrProps(true);
                 final TestActorRef<ShardManager> persistentShardManager =
                         TestActorRef.create(getSystem(), persistentProps);
 
@@ -612,10 +612,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
 
-                final Props nonPersistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(false).build(), ready);
+                final Props nonPersistentProps = newShardMgrProps(false);
                 final TestActorRef<ShardManager> nonPersistentShardManager =
                         TestActorRef.create(getSystem(), nonPersistentProps);
 
@@ -636,7 +633,8 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
+                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
+                        ready, new PrimaryShardInfoFutureCache()) {
                     @Override
                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
@@ -674,7 +672,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -694,7 +692,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -716,7 +714,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -738,7 +736,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
@@ -751,10 +749,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testByDefaultSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
-                new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final Props persistentProps = newShardMgrProps(true);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -768,7 +763,7 @@ public class ShardManagerTest extends AbstractActorTest {
         final Props persistentProps = ShardManager.props(
                 new MockClusterWrapper(),
                 new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -781,10 +776,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
-                new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final Props persistentProps = newShardMgrProps(true);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -805,7 +797,7 @@ public class ShardManagerTest extends AbstractActorTest {
         final Props persistentProps = ShardManager.props(
                 new MockClusterWrapper(),
                 new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -838,7 +830,7 @@ public class ShardManagerTest extends AbstractActorTest {
                         return Arrays.asList("default", "astronauts");
                     }
                 },
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -881,7 +873,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         TestShardManager(String shardMrgIDSuffix) {
             super(new MockClusterWrapper(), new MockConfiguration(),
-                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
+                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
+                    new PrimaryShardInfoFutureCache());
         }
 
         @Override
@@ -939,8 +932,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
-                ActorRef shardActor) {
-            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+                ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
             this.shardActor = shardActor;
             this.name = name;
         }
index 2d1b14c269f922e811f5ce9f8393873c064a7f24..78872907070415657f61b35397b4ee8d9677e582 100644 (file)
@@ -68,4 +68,25 @@ public class ShardTestKit extends JavaTestKit {
         Assert.fail("Leader not found for shard " + shard.path());
     }
 
+    protected void waitUntilNoLeader(ActorRef shard) {
+        FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
+        for(int i = 0; i < 20 * 5; i++) {
+            Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
+            try {
+                FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration);
+                if(resp.getLeaderActor() == null) {
+                    return;
+                }
+            } catch(TimeoutException e) {
+            } catch(Exception e) {
+                System.err.println("FindLeader threw ex");
+                e.printStackTrace();
+            }
+
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Unexpected leader found for shard " + shard.path());
+    }
 }
\ No newline at end of file
index 377c05bf8c2eaed0bd6ac6b4278b780d20c95499..4ee89ca76d4866d616b97ba14b1e268fa1aeaa82 100644 (file)
@@ -331,7 +331,7 @@ public class ActorContextTest extends AbstractActorTest{
     public void testClientDispatcherIsGlobalDispatcher(){
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), DatastoreContext.newBuilder().build());
+                        mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
 
         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
@@ -343,7 +343,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         ActorContext actorContext =
                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), DatastoreContext.newBuilder().build());
+                        mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
 
         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
@@ -356,7 +356,7 @@ public class ActorContextTest extends AbstractActorTest{
         new JavaTestKit(getSystem()) {{
             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
                             mock(Configuration.class), DatastoreContext.newBuilder().
-                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
 
             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
             assertEquals("getTransactionCommitOperationTimeout", 7,
@@ -389,7 +389,7 @@ public class ActorContextTest extends AbstractActorTest{
             final String expPrimaryPath = "akka://test-system/find-primary-shard";
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext) {
+                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
@@ -410,9 +410,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             assertEquals(cachedInfo, actual);
 
-            // Wait for 200 Milliseconds. The cached entry should have been removed.
-
-            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+            actorContext.getPrimaryShardInfoCache().remove("foobar");
 
             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
@@ -432,7 +430,7 @@ public class ActorContextTest extends AbstractActorTest{
             final String expPrimaryPath = "akka://test-system/find-primary-shard";
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext) {
+                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
@@ -454,9 +452,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             assertEquals(cachedInfo, actual);
 
-            // Wait for 200 Milliseconds. The cached entry should have been removed.
-
-            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+            actorContext.getPrimaryShardInfoCache().remove("foobar");
 
             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
@@ -474,7 +470,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext) {
+                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new PrimaryNotFoundException("not found"));
@@ -507,7 +503,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext) {
+                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new NotInitializedException("not iniislized"));
@@ -547,7 +543,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
                     mock(ClusterWrapper.class), mockConfig,
-                    DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+                    DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
 
             actorContext.broadcast(new TestMessage());
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java
new file mode 100644 (file)
index 0000000..9d34ce5
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import scala.concurrent.Future;
+
+/**
+ * Unit tests for PrimaryShardInfoFutureCache.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfoFutureCacheTest {
+
+    @Test
+    public void testOperations() {
+        PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
+
+        assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
+
+        PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), Optional.<DataTree>absent());
+        cache.putSuccessful("foo", shardInfo);
+
+        Future<PrimaryShardInfo> future = cache.getIfPresent("foo");
+        assertNotNull("Null future", future);
+        assertEquals("getIfPresent", shardInfo, future.value().get().get());
+
+        cache.remove("foo");
+
+        assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
+    }
+}