Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes 48/21148/2
authorTom Pantelis <tpanteli@brocade.com>
Thu, 14 May 2015 16:40:48 +0000 (12:40 -0400)
committerMoiz Raja <moraja@cisco.com>
Tue, 26 May 2015 16:26:40 +0000 (16:26 +0000)
Added a PrimaryShardInfoFutureCache class which maintains the cache of
PrimaryShardInfo Future instances per shard. This code was moved from the ActorContext.

The PrimaryShardInfoFutureCache instance is created by the
DistributedDataStore and passed to the ShardManager, as well as to the
ActorContext. The ActorContext uses the cache as before.

On ShardLeaderChanged message, if the leaderId has changed, the
ShardManager calls remove on the PrimaryShardInfoFutureCache to
invalidate the entry for the shard name.

As part of this patch some refactoring was done which removed ShardInfoListeners.

There are a few miscellaneous changes in this patch as well,
- Create the appropriate type of transaction on the datatree for local transactions
- Tracking only read-only transactions which phantom references

Change-Id: I4c7a3845e311e5130d80f22f7cfb3c82c5ad731d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 7fbe47abd15ea486c69c08f20c4cb21324775c3f)

22 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java [new file with mode: 0644]

index a8a076f..b50426a 100644 (file)
@@ -19,7 +19,9 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,8 +32,7 @@ import scala.util.Try;
  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
  * transaction factories.
  */
-abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
-        implements ShardInfoListener, AutoCloseable {
+abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
 
     protected static final AtomicLong TX_COUNTER = new AtomicLong();
@@ -117,34 +118,16 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
         if (maybeDataTree.isPresent()) {
-            knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
-            LOG.debug("Shard {} resolved to local data tree", shardName);
-        }
-    }
-
-    @Override
-    public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
-        final F existing = knownLocal.get(shardName);
-        if (existing != null) {
-            if (primaryShardInfo != null) {
-                final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
-                if (maybeDataTree.isPresent()) {
-                    final DataTree newDataTree = maybeDataTree.get();
-                    final DataTree oldDataTree = dataTreeForFactory(existing);
-                    if (!oldDataTree.equals(newDataTree)) {
-                        final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
-                        knownLocal.replace(shardName, existing, newChain);
-                        LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
-                    }
+            if(!knownLocal.containsKey(shardName)) {
+                LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
 
-                    return;
-                }
-            }
-            if (knownLocal.remove(shardName, existing)) {
-                LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
-            } else {
-                LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
+                F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
+                knownLocal.putIfAbsent(shardName, factory);
             }
+        } else if(knownLocal.containsKey(shardName)) {
+            LOG.debug("Shard {} invalidating local data tree", shardName);
+
+            knownLocal.remove(shardName);
         }
     }
 
@@ -184,14 +167,6 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      */
     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
 
-    /**
-     * Extract the backing data tree from a particular factory.
-     *
-     * @param factory Transaction factory
-     * @return Backing data tree
-     */
-    protected abstract DataTree dataTreeForFactory(F factory);
-
     /**
      * Callback invoked from child transactions to push any futures, which need to
      * be waited for before the next transaction is allocated.
@@ -200,6 +175,48 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
 
     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
-        return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter());
+        switch(parent.getType()) {
+            case READ_ONLY:
+                final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getCompleter()) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        return readOnly;
+                    }
+                };
+            case READ_WRITE:
+                final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getCompleter()) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        return readWrite;
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        return readWrite;
+                    }
+                };
+            case WRITE_ONLY:
+                final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getCompleter()) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        return writeOnly;
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+             default:
+                 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+        }
     }
 }
index 1826665..f655186 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXB
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
@@ -81,8 +82,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         String shardDispatcher =
                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
 
+        PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
         actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
-                datastoreContext, shardDispatcher, shardManagerId ), cluster, configuration, datastoreContext);
+                datastoreContext, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
+                configuration, datastoreContext, primaryShardInfoCache);
 
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
@@ -97,7 +100,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         datastoreInfoMXBean.registerMBean();
     }
 
-    public DistributedDataStore(ActorContext actorContext) {
+    @VisibleForTesting
+    DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
         this.txContextFactory = TransactionContextFactory.create(actorContext);
         this.type = UNKNOWN_TYPE;
@@ -218,14 +222,16 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     }
 
     private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
-                                        DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId){
+                                        DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId,
+                                        PrimaryShardInfoFutureCache primaryShardInfoCache){
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {
             try {
                 return actorSystem.actorOf(
-                        ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
-                                .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId);
+                        ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch,
+                                primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
+                                        ActorContext.MAILBOX), shardManagerId);
             } catch (Exception e){
                 lastException = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
index 93e09db..39d0133 100644 (file)
@@ -11,8 +11,10 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@@ -69,8 +71,18 @@ final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain
         };
     }
 
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier) {
+        return super.newReadOnlyTransaction(identifier);
+    }
+
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) {
         return super.newReadWriteTransaction(identifier);
     }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) {
+        return super.newWriteOnlyTransaction(identifier);
+    }
 }
index dd7c919..e72c8a3 100644 (file)
@@ -14,7 +14,9 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -25,38 +27,42 @@ import scala.concurrent.Future;
  *
  * @author Thomas Pantelis
  */
-final class LocalTransactionContext extends AbstractTransactionContext {
-    private final DOMStoreReadWriteTransaction delegate;
+abstract class LocalTransactionContext extends AbstractTransactionContext {
+
+    private final DOMStoreTransaction txDelegate;
     private final OperationCompleter completer;
 
-    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate, OperationCompleter completer) {
+    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationCompleter completer) {
         super(identifier);
-        this.delegate = Preconditions.checkNotNull(delegate);
+        this.txDelegate = Preconditions.checkNotNull(txDelegate);
         this.completer = Preconditions.checkNotNull(completer);
     }
 
+    protected abstract DOMStoreWriteTransaction getWriteDelegate();
+
+    protected abstract DOMStoreReadTransaction getReadDelegate();
+
     @Override
-    public void writeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        delegate.write(path, data);
+    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        getWriteDelegate().write(path, data);
         completer.onComplete(null, null);
     }
 
     @Override
-    public void mergeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        delegate.merge(path, data);
+    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        getWriteDelegate().merge(path, data);
         completer.onComplete(null, null);
     }
 
     @Override
-    public void deleteData(final YangInstanceIdentifier path) {
-        delegate.delete(path);
+    public void deleteData(YangInstanceIdentifier path) {
+        getWriteDelegate().delete(path);
         completer.onComplete(null, null);
     }
 
     @Override
-    public void readData(final YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
-
-        Futures.addCallback(delegate.read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+    public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+        Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
             @Override
             public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
                 proxyFuture.set(result);
@@ -72,8 +78,8 @@ final class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
-        Futures.addCallback(delegate.exists(path), new FutureCallback<Boolean>() {
+    public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
+        Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
                 proxyFuture.set(result);
@@ -89,7 +95,7 @@ final class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     private LocalThreePhaseCommitCohort ready() {
-        LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) delegate.ready();
+        LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
         completer.onComplete(null, null);
         return ready;
     }
@@ -111,6 +117,6 @@ final class LocalTransactionContext extends AbstractTransactionContext {
 
     @Override
     public void closeTransaction() {
-        delegate.close();
+        txDelegate.close();
     }
 }
index 8ca4424..d574e83 100644 (file)
@@ -8,7 +8,9 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
 /**
  * A factory for creating local transactions used by {@link AbstractTransactionContextFactory} to instantiate
@@ -17,5 +19,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
  * @author Thomas Pantelis
  */
 interface LocalTransactionFactory {
+    DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier);
+
     DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier);
+
+    DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier);
 }
\ No newline at end of file
index dce9b5c..149b937 100644 (file)
@@ -11,8 +11,10 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
@@ -44,11 +46,21 @@ final class LocalTransactionFactoryImpl extends TransactionReadyPrototype<Transa
         return dataTree;
     }
 
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier) {
+        return SnapshotBackedTransactions.newReadTransaction(identifier, false, dataTree.takeSnapshot());
+    }
+
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) {
         return SnapshotBackedTransactions.newReadWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
     }
 
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) {
+        return SnapshotBackedTransactions.newWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
+    }
+
     @Override
     protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx) {
         // No-op
index f4fa7b3..63d4c98 100644 (file)
@@ -64,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShard
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -120,10 +121,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final CountDownLatch waitTillReadyCountdownLatch;
 
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+            PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
@@ -134,6 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = primaryShardInfoCache;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -149,13 +154,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ClusterWrapper cluster,
         final Configuration configuration,
         final DatastoreContext datastoreContext,
-        final CountDownLatch waitTillReadyCountdownLatch) {
+        final CountDownLatch waitTillReadyCountdownLatch,
+        final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+                waitTillReadyCountdownLatch, primaryShardInfoCache));
     }
 
     @Override
@@ -212,7 +220,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
-            shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+                primaryShardInfoCache.remove(shardInformation.getShardName());
+            }
+
             checkReady();
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
@@ -826,10 +837,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return false;
         }
 
-        void setLeaderId(String leaderId) {
+        boolean setLeaderId(String leaderId) {
+            boolean changed = !Objects.equal(this.leaderId, leaderId);
             this.leaderId = leaderId;
 
             notifyOnShardInitializedCallbacks();
+
+            return changed;
         }
     }
 
@@ -840,18 +854,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Configuration configuration;
         final DatastoreContext datastoreContext;
         private final CountDownLatch waitTillReadyCountdownLatch;
+        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-        ShardManagerCreator(ClusterWrapper cluster,
-                            Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
+                CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            this.primaryShardInfoCache = primaryShardInfoCache;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+                    primaryShardInfoCache);
         }
     }
 
index b44f0b1..ff6471c 100644 (file)
@@ -153,7 +153,6 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory<Loca
 
         // Send a close transaction chain request to each and every shard
         getActorContext().broadcast(new CloseTransactionChain(transactionChainId).toSerializable());
-        parent.removeTransactionChain(this);
     }
 
     private TransactionProxy allocateWriteTransaction(final TransactionType type) {
@@ -172,11 +171,6 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory<Loca
         return ret;
     }
 
-    @Override
-    protected DataTree dataTreeForFactory(final LocalTransactionChain factory) {
-        return factory.getDataTree();
-    }
-
     /**
      * This method is overridden to ensure the previous Tx's ready operations complete
      * before we initiate the next Tx in the chain to avoid creation failures if the
index 8d7ca99..49f2ea8 100644 (file)
@@ -8,13 +8,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import java.util.ArrayList;
 import java.util.Collection;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import scala.concurrent.Future;
@@ -25,14 +22,8 @@ import scala.concurrent.Future;
  */
 final class TransactionContextFactory extends AbstractTransactionContextFactory<LocalTransactionFactoryImpl> {
 
-    @GuardedBy("childChains")
-    private final Collection<TransactionChainProxy> childChains = new ArrayList<>();
-
-    private final ShardInfoListenerRegistration<TransactionContextFactory> reg;
-
     private TransactionContextFactory(final ActorContext actorContext) {
         super(actorContext);
-        this.reg = actorContext.registerShardInfoListener(this);
     }
 
     static TransactionContextFactory create(final ActorContext actorContext) {
@@ -41,7 +32,6 @@ final class TransactionContextFactory extends AbstractTransactionContextFactory<
 
     @Override
     public void close() {
-        reg.close();
     }
 
     @Override
@@ -65,33 +55,6 @@ final class TransactionContextFactory extends AbstractTransactionContextFactory<
     }
 
     DOMStoreTransactionChain createTransactionChain() {
-        final TransactionChainProxy ret = new TransactionChainProxy(this);
-
-        synchronized (childChains) {
-            childChains.add(ret);
-        }
-
-        return ret;
-    }
-
-    void removeTransactionChain(final TransactionChainProxy chain) {
-        synchronized (childChains) {
-            childChains.remove(chain);
-        }
-    }
-
-    @Override
-    public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
-        synchronized (childChains) {
-            for (TransactionChainProxy chain : childChains) {
-                chain.onShardInfoUpdated(shardName, primaryShardInfo);
-            }
-            super.onShardInfoUpdated(shardName, primaryShardInfo);
-        }
-    }
-
-    @Override
-    protected DataTree dataTreeForFactory(final LocalTransactionFactoryImpl factory) {
-        return factory.getDataTree();
+        return new TransactionChainProxy(this);
     }
 }
index 5b4f54d..5c514cf 100644 (file)
@@ -15,25 +15,16 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.PoisonPill;
-import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
@@ -69,7 +60,7 @@ import scala.concurrent.duration.FiniteDuration;
  * easily. An ActorContext can be freely passed around to local object instances
  * but should not be passed to actors especially remote actors
  */
-public class ActorContext implements RemovalListener<String, Future<PrimaryShardInfo>> {
+public class ActorContext {
     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
     private static final String METRIC_RATE = "rate";
@@ -104,29 +95,29 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
-    private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
-    @GuardedBy("shardInfoListeners")
-    private final Collection<ShardInfoListenerRegistration<?>> shardInfoListeners = new ArrayList<>();
+
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
         this(actorSystem, shardManager, clusterWrapper, configuration,
-                DatastoreContext.newBuilder().build());
+                DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
     }
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration,
-            DatastoreContext datastoreContext) {
+            DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
         this.actorSystem = actorSystem;
         this.shardManager = shardManager;
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
+        this.primaryShardInfoCache = primaryShardInfoCache;
 
         setCachedProperties();
 
@@ -150,11 +141,6 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
 
         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
-
-        primaryShardInfoCache = CacheBuilder.newBuilder()
-                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
-                .removalListener(this)
-                .build();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -243,13 +229,7 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
             DataTree localShardDataTree) {
         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
-        primaryShardInfoCache.put(shardName, Futures.successful(info));
-
-        synchronized (shardInfoListeners) {
-            for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
-                reg.getInstance().onShardInfoUpdated(shardName, info);
-            }
-        }
+        primaryShardInfoCache.putSuccessful(shardName, info);
         return info;
     }
 
@@ -567,32 +547,7 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
         return ask(actorRef, message, timeout);
     }
 
-    @VisibleForTesting
-    Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+    public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
         return primaryShardInfoCache;
     }
-
-    public <T extends ShardInfoListener> ShardInfoListenerRegistration<T> registerShardInfoListener(final T listener) {
-        final ShardInfoListenerRegistration<T> reg = new ShardInfoListenerRegistration<T>(listener, this);
-
-        synchronized (shardInfoListeners) {
-            shardInfoListeners.add(reg);
-        }
-        return reg;
-    }
-
-    protected void removeShardInfoListener(final ShardInfoListenerRegistration<?> registration) {
-        synchronized (shardInfoListeners) {
-            shardInfoListeners.remove(registration);
-        }
-    }
-
-    @Override
-    public void onRemoval(final RemovalNotification<String, Future<PrimaryShardInfo>> notification) {
-        synchronized (shardInfoListeners) {
-            for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
-                reg.getInstance().onShardInfoUpdated(notification.getKey(), null);
-            }
-        }
-    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCache.java
new file mode 100644 (file)
index 0000000..70d9b94
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.Futures;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import scala.concurrent.Future;
+
+/**
+ * Maintains a cache of PrimaryShardInfo Future instances per shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfoFutureCache {
+    private final Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache = CacheBuilder.newBuilder().build();
+
+    public @Nullable Future<PrimaryShardInfo> getIfPresent(@Nonnull String shardName) {
+        return primaryShardInfoCache.getIfPresent(shardName);
+    }
+
+    public void putSuccessful(@Nonnull String shardName, @Nonnull PrimaryShardInfo info) {
+        primaryShardInfoCache.put(shardName, Futures.successful(info));
+    }
+
+    public void remove(@Nonnull String shardName) {
+        primaryShardInfoCache.invalidate(shardName);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java
deleted file mode 100644 (file)
index 83c5d37..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.utils;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-
-/**
- * Listener interface used to register for primary shard information changes.
- * Implementations of this interface can be registered with {@link ActorContext}
- * to receive notifications about shard information changes.
- */
-public interface ShardInfoListener {
-    /**
-     * Update {@link PrimaryShardInfo} for a particular shard.
-     * @param shardName Shard name
-     * @param primaryShardInfo New {@link PrimaryShardInfo}, null if the information
-     *                         became unavailable.
-     */
-    void onShardInfoUpdated(@Nonnull String shardName, @Nullable PrimaryShardInfo primaryShardInfo);
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java
deleted file mode 100644 (file)
index 3dca66d..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.utils;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-
-/**
- * Registration of a {@link ShardInfoListener} instance.
- *
- * @param <T> Type of listener
- */
-public class ShardInfoListenerRegistration<T extends ShardInfoListener> extends AbstractObjectRegistration<T> {
-    private final ActorContext parent;
-
-    protected ShardInfoListenerRegistration(final T instance, final ActorContext parent) {
-        super(instance);
-        this.parent = Preconditions.checkNotNull(parent);
-    }
-
-    @Override
-    protected void removeRegistration() {
-        parent.removeShardInfoListener(this);
-    }
-}
index 0b2d3ce..27fe3c5 100644 (file)
@@ -66,8 +66,6 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
@@ -154,8 +152,6 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-        doReturn(mock(ShardInfoListenerRegistration.class)).when(mockActorContext).registerShardInfoListener(
-                any(ShardInfoListener.class));
 
         mockComponentFactory = TransactionContextFactory.create(mockActorContext);
 
index 76ae3c7..5f21ee1 100644 (file)
@@ -8,13 +8,16 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -23,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
@@ -31,8 +35,15 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -237,6 +248,38 @@ public class DistributedDataStoreIntegrationTest {
         }};
     }
 
+    @Test
+    public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            DistributedDataStore dataStore = setupDistributedDataStore(
+                    "testSingleTransactionsWritesInQuickSuccession", "cars-1");
+
+            DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+            DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+            writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            doCommit(writeTx.ready());
+
+            writeTx = txChain.newWriteOnlyTransaction();
+
+            int nCars = 5;
+            for(int i = 0; i < nCars; i++) {
+                writeTx.write(CarsModel.newCarPath("car" + i),
+                        CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+            }
+
+            doCommit(writeTx.ready());
+
+            Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
+                    CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
+
+            cleanup(dataStore);
+        }};
+    }
+
     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
             final boolean writeOnly) throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
@@ -637,7 +680,6 @@ public class DistributedDataStoreIntegrationTest {
 
     @Test
     public void testTransactionAbort() throws Exception{
-        System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore =
                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
@@ -819,29 +861,45 @@ public class DistributedDataStoreIntegrationTest {
     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore = setupDistributedDataStore(
-                    "testCreateChainedTransactionsInQuickSuccession", "test-1");
+                    "testCreateChainedTransactionsInQuickSuccession", "cars-1");
 
-            DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+            ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+                    ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+                            LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
 
-            NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+            DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+            List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
 
-            int nTxs = 20;
-            List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
-            for(int i = 0; i < nTxs; i++) {
-                DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+            DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
+            writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            futures.add(writeTx.submit());
 
-                rwTx.merge(TestModel.TEST_PATH, testNode);
+            int nCars = 100;
+            for(int i = 0; i < nCars; i++) {
+                DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
 
-                cohorts.add(rwTx.ready());
+                rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
+                        CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
 
+                futures.add(rwTx.submit());
             }
 
-            for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-                doCommit(cohort);
+            for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
+                f.checkedGet();
             }
 
+            Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
+                    LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
+
             txChain.close();
 
+            broker.close();
+
             cleanup(dataStore);
         }};
     }
index be8ede9..53b904e 100644 (file)
@@ -55,6 +55,11 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     private static final String[] SHARD_NAMES = {"cars", "people"};
 
+    private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+    private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
+
+    private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf";
+
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
 
@@ -62,7 +67,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
 
     private final DatastoreContext.Builder followerDatastoreContextBuilder =
-            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(200);
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
 
     private DistributedDataStore followerDistributedDataStore;
     private DistributedDataStore leaderDistributedDataStore;
@@ -72,11 +77,10 @@ public class DistributedDataStoreRemotingIntegrationTest {
     @Before
     public void setUpClass() {
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
-        Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
-        Cluster.get(leaderSystem).join(member1Address);
+        Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
 
         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
-        Cluster.get(followerSystem).join(member1Address);
+        Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
     }
 
     @After
@@ -88,12 +92,10 @@ public class DistributedDataStoreRemotingIntegrationTest {
     private void initDatastores(String type) {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
-        String moduleShardsConfig = "module-shards-member1-and-2.conf";
-
         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
-        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
+        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
 
-        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, true, SHARD_NAMES);
+        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
     }
@@ -355,6 +357,58 @@ public class DistributedDataStoreRemotingIntegrationTest {
         assertEquals("isPresent", false, optional.isPresent());
     }
 
+    @Test
+    public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
+        String testName = "testSingleShardTransactionsWithLeaderChanges";
+        initDatastores(testName);
+
+        String followerCarShardName = "member-2-shard-cars-" + testName;
+        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
+
+        // Write top-level car container from the follower so it uses a remote Tx.
+
+        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+
+        // Switch the leader to the follower
+
+        followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
+        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+        followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
+
+        leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
+
+        DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
+                shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+        IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
+        DistributedDataStore newMember1Datastore = newMember1TestKit.
+                    setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
+
+        // Write a car entry to the new leader - should switch to local Tx
+
+        writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
+        writeTx.merge(car1Path, car1);
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
+    }
+
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastores("testReadyLocalTransactionForwardedToLeader");
index 94f2085..53e43f8 100644 (file)
@@ -72,14 +72,7 @@ class IntegrationTestKit extends ShardTestKit {
 
     void waitUntilLeader(ActorContext actorContext, String... shardNames) {
         for(String shardName: shardNames) {
-            ActorRef shard = null;
-            for(int i = 0; i < 20 * 5 && shard == null; i++) {
-                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-                Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
-                if(shardReply.isPresent()) {
-                    shard = shardReply.get();
-                }
-            }
+            ActorRef shard = findLocalShard(actorContext, shardName);
 
             assertNotNull("Shard was not created", shard);
 
@@ -87,6 +80,27 @@ class IntegrationTestKit extends ShardTestKit {
         }
     }
 
+    void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
+        for(String shardName: shardNames) {
+            ActorRef shard = findLocalShard(actorContext, shardName);
+            assertNotNull("No local shard found", shard);
+
+            waitUntilNoLeader(shard);
+        }
+    }
+
+    private ActorRef findLocalShard(ActorContext actorContext, String shardName) {
+        ActorRef shard = null;
+        for(int i = 0; i < 20 * 5 && shard == null; i++) {
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
+            if(shardReply.isPresent()) {
+                shard = shardReply.get();
+            }
+        }
+        return shard;
+    }
+
     void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
             NormalizedNode<?, ?> nodeToWrite) throws Exception {
 
index 7ca9f90..7593eea 100644 (file)
@@ -12,7 +12,9 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -32,7 +34,17 @@ public class LocalTransactionContextTest {
     @Before
     public void setUp(){
         MockitoAnnotations.initMocks(this);
-        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter));
+        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter)) {
+            @Override
+            protected DOMStoreWriteTransaction getWriteDelegate() {
+                return readWriteTransaction;
+            }
+
+            @Override
+            protected DOMStoreReadTransaction getReadDelegate() {
+                return readWriteTransaction;
+            }
+        };
     }
 
     @Test
index 645890d..5f6973f 100644 (file)
@@ -60,6 +60,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
@@ -93,6 +94,8 @@ public class ShardManagerTest extends AbstractActorTest {
         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
     }
 
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
@@ -112,9 +115,9 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.clear();
     }
 
-    private Props newShardMgrProps() {
+    private Props newShardMgrProps(boolean persistent) {
         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                datastoreContextBuilder.build(), ready);
+                datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
@@ -129,7 +132,7 @@ public class ShardManagerTest extends AbstractActorTest {
             @Override
             public ShardManager create() throws Exception {
                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
-                        ready, name, shardActor);
+                        ready, name, shardActor, primaryShardInfoCache);
             }
         };
 
@@ -529,7 +532,7 @@ public class ShardManagerTest extends AbstractActorTest {
             throws Exception {
         new JavaTestKit(getSystem()) {{
             final TestActorRef<ShardManager> shardManager =
-                    TestActorRef.create(getSystem(), newShardMgrProps());
+                    TestActorRef.create(getSystem(), newShardMgrProps(true));
 
             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
 
@@ -564,7 +567,7 @@ public class ShardManagerTest extends AbstractActorTest {
             throws Exception {
         new JavaTestKit(getSystem()) {{
             final TestActorRef<ShardManager> shardManager =
-                    TestActorRef.create(getSystem(), newShardMgrProps());
+                    TestActorRef.create(getSystem(), newShardMgrProps(true));
 
             SchemaContext schemaContext = mock(SchemaContext.class);
             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
@@ -601,10 +604,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRecoveryApplicable(){
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build(), ready);
+                final Props persistentProps = newShardMgrProps(true);
                 final TestActorRef<ShardManager> persistentShardManager =
                         TestActorRef.create(getSystem(), persistentProps);
 
@@ -612,10 +612,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
 
-                final Props nonPersistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(false).build(), ready);
+                final Props nonPersistentProps = newShardMgrProps(false);
                 final TestActorRef<ShardManager> nonPersistentShardManager =
                         TestActorRef.create(getSystem(), nonPersistentProps);
 
@@ -636,7 +633,8 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
+                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
+                        ready, new PrimaryShardInfoFutureCache()) {
                     @Override
                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
@@ -674,7 +672,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -694,7 +692,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -716,7 +714,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -738,7 +736,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
@@ -751,10 +749,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testByDefaultSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
-                new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final Props persistentProps = newShardMgrProps(true);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -768,7 +763,7 @@ public class ShardManagerTest extends AbstractActorTest {
         final Props persistentProps = ShardManager.props(
                 new MockClusterWrapper(),
                 new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -781,10 +776,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
-                new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final Props persistentProps = newShardMgrProps(true);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -805,7 +797,7 @@ public class ShardManagerTest extends AbstractActorTest {
         final Props persistentProps = ShardManager.props(
                 new MockClusterWrapper(),
                 new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -838,7 +830,7 @@ public class ShardManagerTest extends AbstractActorTest {
                         return Arrays.asList("default", "astronauts");
                     }
                 },
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -881,7 +873,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         TestShardManager(String shardMrgIDSuffix) {
             super(new MockClusterWrapper(), new MockConfiguration(),
-                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
+                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
+                    new PrimaryShardInfoFutureCache());
         }
 
         @Override
@@ -939,8 +932,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
-                ActorRef shardActor) {
-            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+                ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
             this.shardActor = shardActor;
             this.name = name;
         }
index 2d1b14c..7887290 100644 (file)
@@ -68,4 +68,25 @@ public class ShardTestKit extends JavaTestKit {
         Assert.fail("Leader not found for shard " + shard.path());
     }
 
+    protected void waitUntilNoLeader(ActorRef shard) {
+        FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
+        for(int i = 0; i < 20 * 5; i++) {
+            Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
+            try {
+                FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration);
+                if(resp.getLeaderActor() == null) {
+                    return;
+                }
+            } catch(TimeoutException e) {
+            } catch(Exception e) {
+                System.err.println("FindLeader threw ex");
+                e.printStackTrace();
+            }
+
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Unexpected leader found for shard " + shard.path());
+    }
 }
\ No newline at end of file
index 377c05b..4ee89ca 100644 (file)
@@ -331,7 +331,7 @@ public class ActorContextTest extends AbstractActorTest{
     public void testClientDispatcherIsGlobalDispatcher(){
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), DatastoreContext.newBuilder().build());
+                        mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
 
         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
@@ -343,7 +343,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         ActorContext actorContext =
                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), DatastoreContext.newBuilder().build());
+                        mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
 
         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
@@ -356,7 +356,7 @@ public class ActorContextTest extends AbstractActorTest{
         new JavaTestKit(getSystem()) {{
             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
                             mock(Configuration.class), DatastoreContext.newBuilder().
-                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
 
             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
             assertEquals("getTransactionCommitOperationTimeout", 7,
@@ -389,7 +389,7 @@ public class ActorContextTest extends AbstractActorTest{
             final String expPrimaryPath = "akka://test-system/find-primary-shard";
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext) {
+                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
@@ -410,9 +410,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             assertEquals(cachedInfo, actual);
 
-            // Wait for 200 Milliseconds. The cached entry should have been removed.
-
-            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+            actorContext.getPrimaryShardInfoCache().remove("foobar");
 
             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
@@ -432,7 +430,7 @@ public class ActorContextTest extends AbstractActorTest{
             final String expPrimaryPath = "akka://test-system/find-primary-shard";
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext) {
+                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
@@ -454,9 +452,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             assertEquals(cachedInfo, actual);
 
-            // Wait for 200 Milliseconds. The cached entry should have been removed.
-
-            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+            actorContext.getPrimaryShardInfoCache().remove("foobar");
 
             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
@@ -474,7 +470,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext) {
+                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new PrimaryNotFoundException("not found"));
@@ -507,7 +503,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext) {
+                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new NotInitializedException("not iniislized"));
@@ -547,7 +543,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
                     mock(ClusterWrapper.class), mockConfig,
-                    DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+                    DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
 
             actorContext.broadcast(new TestMessage());
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java
new file mode 100644 (file)
index 0000000..9d34ce5
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import scala.concurrent.Future;
+
+/**
+ * Unit tests for PrimaryShardInfoFutureCache.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfoFutureCacheTest {
+
+    @Test
+    public void testOperations() {
+        PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
+
+        assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
+
+        PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), Optional.<DataTree>absent());
+        cache.putSuccessful("foo", shardInfo);
+
+        Future<PrimaryShardInfo> future = cache.getIfPresent("foo");
+        assertNotNull("Null future", future);
+        assertEquals("getIfPresent", shardInfo, future.value().get().get());
+
+        cache.remove("foo");
+
+        assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
+    }
+}