Merge "Bug 2412: Expose Mountpoints on proper path"
authorTony Tkacik <ttkacik@cisco.com>
Wed, 11 Mar 2015 06:13:38 +0000 (06:13 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 11 Mar 2015 06:13:32 +0000 (06:13 +0000)
21 files changed:
features/mdsal/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java with 83% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java [new file with mode: 0644]

index 5b5c1b94e09f78cada5baf183648ee30054fa0ab..bd0a99d9cae3f00373a7c9044823d913a96bfc8c 100644 (file)
       <type>xml</type>
       <classifier>config</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-ssh</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.opendaylight.controller.model</groupId>
index 3ce1f5d1e89038c7d4338f099cb0f49b909043cc..72b5ac95153f07c1b315e01c809f0b76f985b433 100644 (file)
@@ -18,12 +18,18 @@ package org.opendaylight.controller.cluster.raft.base.messages;
  */
 public class FollowerInitialSyncUpStatus {
     private final boolean initialSyncDone;
+    private final String name;
 
-    public FollowerInitialSyncUpStatus(boolean initialSyncDone){
+    public FollowerInitialSyncUpStatus(boolean initialSyncDone, String name){
         this.initialSyncDone = initialSyncDone;
+        this.name = name;
     }
 
     public boolean isInitialSyncDone() {
         return initialSyncDone;
     }
+
+    public String getName() {
+        return name;
+    }
 }
index ef5f11e37aef4fe7490887a27d91caedb0e50c51..e814cd000dda4ff2dddbc38665a37c80fc818bdf 100644 (file)
@@ -476,4 +476,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         }
     }
 
+    protected String getId(){
+        return context.getId();
+    }
+
 }
index 618865cb88eb8877cdcfdcfb29208c80707c2c0f..0f251a3012e7afe492b867ced86859316fde6d88 100644 (file)
@@ -352,7 +352,7 @@ public class Follower extends AbstractRaftActorBehavior {
         return snapshotTracker;
     }
 
-    private static class InitialSyncStatusTracker {
+    private class InitialSyncStatusTracker {
 
         private static final long INVALID_LOG_INDEX = -2L;
         private long initialLeaderCommit = INVALID_LOG_INDEX;
@@ -374,10 +374,10 @@ public class Follower extends AbstractRaftActorBehavior {
 
             if(!initialSyncUpDone){
                 if(initialLeaderCommit == INVALID_LOG_INDEX){
-                    actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender());
+                    actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender());
                     initialLeaderCommit = leaderCommit;
                 } else if(commitIndex >= initialLeaderCommit){
-                    actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender());
+                    actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender());
                     initialSyncUpDone = true;
                 }
             }
index 80aa3793c1a2daf140441152c80a03b2408b8dd6..ec867dda0bb924d7ffbab7ca3ded58f52ad90fb8 100644 (file)
@@ -12,7 +12,7 @@ import com.google.common.base.Preconditions;
 import java.util.concurrent.Semaphore;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 
-final class OperationCompleter extends OnComplete<Object> {
+public final class OperationCompleter extends OnComplete<Object> {
     private final Semaphore operationLimiter;
 
     OperationCompleter(Semaphore operationLimiter){
index 52b4652de6e7d464d9d54c67ebea37ca233de862..e704e42465b99e1183ca20c19742ccd2424e410f 100644 (file)
@@ -67,6 +67,7 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
@@ -295,6 +296,9 @@ public class Shard extends RaftActor {
                 onDatastoreContext((DatastoreContext)message);
             } else if(message instanceof RegisterRoleChangeListener){
                 roleChangeNotifier.get().forward(message, context());
+            } else if (message instanceof FollowerInitialSyncUpStatus){
+                shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+                context().parent().tell(message, self());
             } else {
                 super.onReceiveCommand(message);
             }
index c441afb49787fb7d5ae946c7fc0e0e91ec7137ad..136c6813eaba9d7d116b4ba0b4609bbd4848fb13 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -55,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -166,16 +168,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             ignoreMessage(message);
         } else if(message instanceof DatastoreContext) {
             onDatastoreContext((DatastoreContext)message);
-        } else if(message instanceof RoleChangeNotification){
+        } else if(message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
+        } else if(message instanceof FollowerInitialSyncUpStatus){
+            onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
         } else{
             unknownMessage(message);
         }
 
     }
 
-    private void onRoleChangeNotification(RoleChangeNotification message) {
-        RoleChangeNotification roleChanged = message;
+    private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+        LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
+                status.isInitialSyncDone());
+
+        ShardInformation shardInformation = findShardInformation(status.getName());
+
+        if(shardInformation != null) {
+            shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
+
+            mBean.setSyncStatus(isInSync());
+        }
+
+    }
+
+    private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
         LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
@@ -189,6 +206,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 waitTillReadyCountdownLatch.countDown();
             }
+
+            mBean.setSyncStatus(isInSync());
         }
     }
 
@@ -214,6 +233,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return isReady;
     }
 
+    private boolean isInSync(){
+        for (ShardInformation info : localShards.values()) {
+            if(!info.isInSync()){
+                return false;
+            }
+        }
+        return true;
+    }
+
     private void onActorInitialized(Object message) {
         final ActorRef sender = getSender();
 
@@ -519,6 +547,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return dataPersistenceProvider;
     }
 
+    @VisibleForTesting
+    ShardManagerInfoMBean getMBean(){
+        return mBean;
+    }
+
     private class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
@@ -529,6 +562,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
 
+        private boolean followerSyncStatus = false;
+
         private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
         private String role ;
 
@@ -607,6 +642,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return this.role;
         }
 
+        public void setFollowerSyncStatus(boolean syncStatus){
+            this.followerSyncStatus = syncStatus;
+        }
+
+        public boolean isInSync(){
+            if(RaftState.Follower.name().equals(this.role)){
+                return followerSyncStatus;
+            } else if(RaftState.Leader.name().equals(this.role)){
+                return true;
+            }
+
+            return false;
+        }
+
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
index ee3a5cc82573d6e415a5bbcd080277673e4cb051..58ac1d8b8265bc50fb7d38dea1dd9c1b916211fc 100644 (file)
@@ -175,45 +175,47 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
 
         /**
          * This method is overridden to ensure the previous Tx's ready operations complete
-         * before we create the next shard Tx in the chain to avoid creation failures if the
+         * before we initiate the next Tx in the chain to avoid creation failures if the
          * previous Tx's ready operations haven't completed yet.
          */
         @Override
-        protected Future<Object> sendCreateTransaction(final ActorSelection shard,
-                final Object serializedCreateMessage) {
-
+        protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
             // Check if there are any previous ready Futures, otherwise let the super class handle it.
             if(previousReadyFutures.isEmpty()) {
-                return super.sendCreateTransaction(shard, serializedCreateMessage);
+                return super.sendFindPrimaryShardAsync(shardName);
+            }
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
+                        previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
             }
 
             // Combine the ready Futures into 1.
             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
-                    previousReadyFutures, getActorContext().getActorSystem().dispatcher());
+                    previousReadyFutures, getActorContext().getClientDispatcher());
 
             // Add a callback for completion of the combined Futures.
-            final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+            final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
             OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
                 @Override
                 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
                     if(failure != null) {
                         // A Ready Future failed so fail the returned Promise.
-                        createTxPromise.failure(failure);
+                        returnPromise.failure(failure);
                     } else {
-                        LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}",
+                        LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
                                 getIdentifier(), getTransactionChainId());
 
-                        // Send the CreateTx message and use the resulting Future to complete the
+                        // Send the FindPrimaryShard message and use the resulting Future to complete the
                         // returned Promise.
-                        createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
-                                serializedCreateMessage));
+                        returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
                     }
                 }
             };
 
-            combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
 
-            return createTxPromise.future();
+            return returnPromise.future();
         }
     }
 }
index 1e222e4c0a667307ce9b7cf3c24585b7f61d5911..c1f9c78e69ec683586147e01605ab7168f786e1b 100644 (file)
@@ -37,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-class TransactionContextImpl extends AbstractTransactionContext {
+public class TransactionContextImpl extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
     private final ActorContext actorContext;
@@ -49,7 +49,7 @@ class TransactionContextImpl extends AbstractTransactionContext {
     private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
 
-    TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+    protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, SchemaContext schemaContext,
             boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
         super(identifier);
@@ -153,8 +153,8 @@ class TransactionContextImpl extends AbstractTransactionContext {
 
                 } else {
                     // Throwing an exception here will fail the Future.
-                    throw new IllegalArgumentException(String.format("Invalid reply type %s",
-                            serializedReadyReply.getClass()));
+                    throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+                            identifier, serializedReadyReply.getClass()));
                 }
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
index 0bc82af3358c4747901fab005db5c19cf2d41288..64b9086c250c16f759a417003d0cefc9839f688b 100644 (file)
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -425,18 +426,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
     }
 
-    /**
-     * Method called to send a CreateTransaction message to a shard.
-     *
-     * @param shard the shard actor to send to
-     * @param serializedCreateMessage the serialized message to send
-     * @return the response Future
-     */
-    protected Future<Object> sendCreateTransaction(ActorSelection shard,
-            Object serializedCreateMessage) {
-        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
-    }
-
     @Override
     public Object getIdentifier() {
         return this.identifier;
@@ -465,14 +454,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
+    protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+        return actorContext.findPrimaryShardAsync(shardName);
+    }
+
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
-            Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
+            Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
-            final TransactionFutureCallback newTxFutureCallback =
-                    new TransactionFutureCallback(shardName);
+            final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
@@ -598,10 +590,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
-                    new CreateTransaction(identifier.toString(),
-                            TransactionProxy.this.transactionType.ordinal(),
-                            getTransactionChainId()).toSerializable());
+            Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
+                    TransactionProxy.this.transactionType.ordinal(),
+                    getTransactionChainId()).toSerializable();
+
+            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
 
             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
         }
@@ -731,7 +724,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 return new TransactionContextImpl(transactionPath, transactionActor, identifier,
                     actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
             } else {
-                return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
+                return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
                         actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
             }
         }
@@ -5,9 +5,11 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.datastore.compat;
 
 import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.OperationCompleter;
+import org.opendaylight.controller.cluster.datastore.TransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -23,9 +25,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  *
  * @author Thomas Pantelis
  */
-class LegacyTransactionContextImpl extends TransactionContextImpl {
+public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
 
-    LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationCompleter operationCompleter) {
         super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
index 945ae0a4786ab931a62c071533ec89842847a84e..6222d3be09fce8ba46ee0a8b94cfd4e4bd32fe41 100644 (file)
@@ -67,6 +67,8 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
     private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
+    private boolean followerInitialSyncStatus = false;
+
     public ShardStats(final String shardName, final String mxBeanType) {
         super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
     }
@@ -276,4 +278,13 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
     public void setDataStore(final InMemoryDOMDataStore store) {
         setNotificationManager(store.getDataChangeListenerNotificationManager());
     }
+
+    public void setFollowerInitialSyncStatus(boolean followerInitialSyncStatus) {
+        this.followerInitialSyncStatus = followerInitialSyncStatus;
+    }
+
+    @Override
+    public boolean getFollowerInitialSyncStatus() {
+        return followerInitialSyncStatus;
+    }
 }
index 99c8daf87d30af3ce66bf3b5c42aa86133ec5575..8adc8b24b27c1ad4d3eff65730cf9d27646fd3d0 100644 (file)
@@ -18,6 +18,8 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo
 
     private final List<String> localShards;
 
+    private boolean syncStatus = false;
+
     public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
         super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
         this.localShards = localShards;
@@ -36,4 +38,13 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo
     public List<String> getLocalShards() {
         return localShards;
     }
+
+    @Override
+    public boolean getSyncStatus() {
+        return this.syncStatus;
+    }
+
+    public void setSyncStatus(boolean syncStatus){
+        this.syncStatus = syncStatus;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
new file mode 100644 (file)
index 0000000..4896b05
--- /dev/null
@@ -0,0 +1,408 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
+import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Abstract base class for TransactionProxy unit tests.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractTransactionProxyTest {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static ActorSystem system;
+
+    private final Configuration configuration = new MockConfiguration();
+
+    @Mock
+    protected ActorContext mockActorContext;
+
+    private SchemaContext schemaContext;
+
+    @Mock
+    private ClusterWrapper mockClusterWrapper;
+
+    protected final String memberName = "mock-member";
+
+    protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
+            shardBatchedModificationCount(1);
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+
+        Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
+                put("akka.actor.default-dispatcher.type",
+                        "akka.testkit.CallingThreadDispatcherConfigurator").build()).
+                withFallback(ConfigFactory.load());
+        system = ActorSystem.create("test", config);
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws IOException {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+
+        schemaContext = TestModel.createTestContext();
+
+        doReturn(getSystem()).when(mockActorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
+        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
+        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
+        doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        ShardStrategyFactory.setConfiguration(configuration);
+    }
+
+    protected ActorSystem getSystem() {
+        return system;
+    }
+
+    protected CreateTransaction eqCreateTransaction(final String memberName,
+            final TransactionType type) {
+        ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+            @Override
+            public boolean matches(Object argument) {
+                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+                    return obj.getTransactionId().startsWith(memberName) &&
+                            obj.getTransactionType() == type.ordinal();
+                }
+
+                return false;
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected DataExists eqSerializedDataExists() {
+        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+            @Override
+            public boolean matches(Object argument) {
+                return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected DataExists eqDataExists() {
+        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+            @Override
+            public boolean matches(Object argument) {
+                return (argument instanceof DataExists) &&
+                    ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected ReadData eqSerializedReadData() {
+        return eqSerializedReadData(TestModel.TEST_PATH);
+    }
+
+    protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
+        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       ReadData.fromSerializable(argument).getPath().equals(path);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected ReadData eqReadData() {
+        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+            @Override
+            public boolean matches(Object argument) {
+                return (argument instanceof ReadData) &&
+                    ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected Future<Object> readySerializedTxReply(String path) {
+        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
+    }
+
+    protected Future<Object> readyTxReply(String path) {
+        return Futures.successful((Object)new ReadyTransactionReply(path));
+    }
+
+    protected Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+            short transactionVersion) {
+        return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
+    }
+
+    protected Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
+        return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    protected Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
+        return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
+    }
+
+    protected Future<Object> dataExistsSerializedReply(boolean exists) {
+        return Futures.successful(new DataExistsReply(exists).toSerializable());
+    }
+
+    protected Future<DataExistsReply> dataExistsReply(boolean exists) {
+        return Futures.successful(new DataExistsReply(exists));
+    }
+
+    protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
+        return Futures.successful(new BatchedModificationsReply(count));
+    }
+
+    protected Future<Object> incompleteFuture(){
+        return mock(Future.class);
+    }
+
+    protected ActorSelection actorSelection(ActorRef actorRef) {
+        return getSystem().actorSelection(actorRef.path());
+    }
+
+    protected void expectBatchedModifications(ActorRef actorRef, int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+    }
+
+    protected void expectBatchedModifications(int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
+    }
+
+    protected void expectIncompleteBatchedModifications() {
+        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
+    }
+
+    protected void expectReadyTransaction(ActorRef actorRef) {
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+    }
+
+    protected void expectFailedBatchedModifications(ActorRef actorRef) {
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+    }
+
+    protected CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
+        return CreateTransactionReply.newBuilder()
+            .setTransactionActorPath(actorRef.path().toString())
+            .setTransactionId("txn-1")
+            .setMessageVersion(transactionVersion)
+            .build();
+    }
+
+    protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
+        ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+        log.info("Created mock shard actor {}", actorRef);
+
+        doReturn(actorSystem.actorSelection(actorRef.path())).
+                when(mockActorContext).actorSelection(actorRef.path().toString());
+
+        doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
+                when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        return actorRef;
+    }
+
+    protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion) {
+        ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
+
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
+                memberName, shardActorRef);
+    }
+
+    protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
+
+        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+        log.info("Created mock shard Tx actor {}", txActorRef);
+
+        doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection(
+                txActorRef.path().toString());
+
+        doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(prefix, type));
+
+        return txActorRef;
+    }
+
+    protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
+    }
+
+
+    protected void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+            throws Throwable {
+
+        try {
+            future.checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected ReadFailedException");
+        } catch(ReadFailedException e) {
+            throw e.getCause();
+        }
+    }
+
+    protected List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
+        ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
+                ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
+                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
+
+        List<BatchedModifications> batchedModifications = filterCaptured(
+                batchedModificationsCaptor, BatchedModifications.class);
+        return batchedModifications;
+    }
+
+    protected <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
+        List<T> captured = new ArrayList<>();
+        for(T c: captor.getAllValues()) {
+            if(type.isInstance(c)) {
+                captured.add(c);
+            }
+        }
+
+        return captured;
+    }
+
+    protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), expected);
+    }
+
+    protected void verifyBatchedModifications(Object message, Modification... expected) {
+        assertEquals("Message type", BatchedModifications.class, message.getClass());
+        BatchedModifications batchedModifications = (BatchedModifications)message;
+        assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+        for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
+            Modification actual = batchedModifications.getModifications().get(i);
+            assertEquals("Modification type", expected[i].getClass(), actual.getClass());
+            assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
+                    ((AbstractModification)actual).getPath());
+            if(actual instanceof WriteModification) {
+                assertEquals("getData", ((WriteModification)expected[i]).getData(),
+                        ((WriteModification)actual).getData());
+            }
+        }
+    }
+
+    protected void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
+            Object... expReplies) throws Exception {
+            assertEquals("getReadyOperationFutures size", expReplies.length,
+                    proxy.getCohortFutures().size());
+
+            int i = 0;
+            for( Future<ActorSelection> future: proxy.getCohortFutures()) {
+                assertNotNull("Ready operation Future is null", future);
+
+                Object expReply = expReplies[i++];
+                if(expReply instanceof ActorSelection) {
+                    ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                    assertEquals("Cohort actor path", expReply, actual);
+                } else {
+                    try {
+                        Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                        fail("Expected exception from ready operation Future");
+                    } catch(Exception e) {
+                        assertTrue(String.format("Expected exception type %s. Actual %s",
+                                expReply, e.getClass()), ((Class<?>)expReply).isInstance(e));
+                    }
+                }
+            }
+        }
+}
index f0cdacc9ef2bc0e08ccb0348a8d990279de8f57a..c005751380517d626cf5dc8012d71016e41c0962 100644 (file)
@@ -20,8 +20,10 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -48,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -469,6 +472,132 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
 
+    @Test
+    public void testByDefaultSyncStatusIsFalse() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration(),
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+    }
+
+    @Test
+    public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration(),
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+                RaftState.Follower.name(), RaftState.Leader.name()));
+
+        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+    }
+
+    @Test
+    public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration(),
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+                RaftState.Follower.name(), RaftState.Candidate.name()));
+
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+    }
+
+    @Test
+    public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration(),
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+                RaftState.Candidate.name(), RaftState.Follower.name()));
+
+        // Initially will be false
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Send status true will make sync status true
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+
+        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+        // Send status false will make sync status false
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
+
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+    }
+
+    @Test
+    public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
+        final Props persistentProps = ShardManager.props(
+                new MockClusterWrapper(),
+                new MockConfiguration() {
+                    @Override
+                    public List<String> getMemberShardNames(String memberName) {
+                        return Arrays.asList("default", "astronauts");
+                    }
+                },
+                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final TestActorRef<ShardManager> shardManager =
+                TestActorRef.create(getSystem(), persistentProps);
+
+        ShardManager shardManagerActor = shardManager.underlyingActor();
+
+        // Initially will be false
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Make default shard leader
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+                RaftState.Follower.name(), RaftState.Leader.name()));
+
+        // default = Leader, astronauts is unknown so sync status remains false
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Make astronauts shard leader as well
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+                RaftState.Follower.name(), RaftState.Leader.name()));
+
+        // Now sync status should be true
+        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+        // Make astronauts a Follower
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+                RaftState.Leader.name(), RaftState.Follower.name()));
+
+        // Sync status is not true
+        assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+        // Make the astronauts follower sync status true
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
+
+        // Sync status is now true
+        assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+    }
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
index 7dfbd668b811231b4b32edd76a04987c671aba99..d930b2519e379f9b1f9bb54a5482e25857cb1faa 100644 (file)
@@ -84,6 +84,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
@@ -1618,7 +1619,25 @@ public class ShardTest extends AbstractActorTest {
                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
 
                 assertEquals(1, allMatching.size());
-            }};
+            }
+        };
+    }
+
+    @Test
+    public void testFollowerInitialSyncStatus() throws Exception {
+        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                "testFollowerInitialSyncStatus");
+
+        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
+
+        assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
+
+        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
+
+        assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
 
index 88ab0dd292b4894f0eac4c8ae782452d9d696471..4f00ed5f4bcfe4e88908d9bf2ca25417c3658e18 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import akka.actor.ActorRef;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Promise;
 
-public class TransactionChainProxyTest extends AbstractActorTest{
-    ActorContext actorContext = null;
-    SchemaContext schemaContext = mock(SchemaContext.class);
-
-    @Mock
-    ActorContext mockActorContext;
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-
-        actorContext = new MockActorContext(getSystem());
-        actorContext.setSchemaContext(schemaContext);
-
-        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
-        doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext();
-    }
+public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
     @SuppressWarnings("resource")
     @Test
     public void testNewReadOnlyTransaction() throws Exception {
 
-     DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadOnlyTransaction();
+     DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
 
     }
@@ -58,7 +55,7 @@ public class TransactionChainProxyTest extends AbstractActorTest{
     @SuppressWarnings("resource")
     @Test
     public void testNewReadWriteTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadWriteTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
 
     }
@@ -66,18 +63,16 @@ public class TransactionChainProxyTest extends AbstractActorTest{
     @SuppressWarnings("resource")
     @Test
     public void testNewWriteOnlyTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newWriteOnlyTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
 
     }
 
     @Test
     public void testClose() throws Exception {
-        ActorContext context = mock(ActorContext.class);
+        new TransactionChainProxy(mockActorContext).close();
 
-        new TransactionChainProxy(context).close();
-
-        verify(context, times(1)).broadcast(anyObject());
+        verify(mockActorContext, times(1)).broadcast(anyObject());
     }
 
     @Test
@@ -115,4 +110,93 @@ public class TransactionChainProxyTest extends AbstractActorTest{
 
         verify(mockActorContext, times(0)).acquireTxCreationPermit();
     }
+
+    /**
+     * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
+     * initiated until the first one completes its read future.
+     */
+    @Test
+    public void testChainedReadWriteTransactions() throws Exception {
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModifications(txActorRef1, 1);
+
+        Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
+        doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+        DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
+
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+        writeTx1.ready();
+
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
+
+        String tx2MemberName = "tx2MemberName";
+        doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
+        ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+        ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
+                DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
+
+        expectBatchedModifications(txActorRef2, 1);
+
+        final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
+
+        final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+        final CountDownLatch write2Complete = new CountDownLatch(1);
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+                } catch (Exception e) {
+                    caughtEx.set(e);
+                } finally {
+                    write2Complete.countDown();
+                }
+            }
+        }.start();
+
+        assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
+
+        if(caughtEx.get() != null) {
+            throw caughtEx.get();
+        }
+
+        try {
+            verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
+                    eqCreateTransaction(tx2MemberName, READ_WRITE));
+        } catch (AssertionError e) {
+            fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
+        }
+
+        readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+
+        verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
+                eqCreateTransaction(tx2MemberName, READ_WRITE));
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+        expectBatchedModifications(actorRef, 1);
+
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
+
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+        NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        txChainProxy.newWriteOnlyTransaction();
+    }
 }
index abfe7eae22a15b69fd4dc1c71df46befe5059e31..8278d3cffceaa6b82357c08c4ab62e33ff53fafe 100644 (file)
@@ -6,11 +6,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -21,78 +19,44 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
-import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatcher;
 import org.mockito.InOrder;
-import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.Duration;
 
 @SuppressWarnings("resource")
-public class TransactionProxyTest {
+public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @SuppressWarnings("serial")
     static class TestException extends RuntimeException {
@@ -102,291 +66,6 @@ public class TransactionProxyTest {
         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
     }
 
-    private static ActorSystem system;
-
-    private final Configuration configuration = new MockConfiguration();
-
-    @Mock
-    private ActorContext mockActorContext;
-
-    private SchemaContext schemaContext;
-
-    @Mock
-    private ClusterWrapper mockClusterWrapper;
-
-    private final String memberName = "mock-member";
-
-    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
-            shardBatchedModificationCount(1);
-
-    @BeforeClass
-    public static void setUpClass() throws IOException {
-
-        Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
-                put("akka.actor.default-dispatcher.type",
-                        "akka.testkit.CallingThreadDispatcherConfigurator").build()).
-                withFallback(ConfigFactory.load());
-        system = ActorSystem.create("test", config);
-    }
-
-    @AfterClass
-    public static void tearDownClass() throws IOException {
-        JavaTestKit.shutdownActorSystem(system);
-        system = null;
-    }
-
-    @Before
-    public void setUp(){
-        MockitoAnnotations.initMocks(this);
-
-        schemaContext = TestModel.createTestContext();
-
-        doReturn(getSystem()).when(mockActorContext).getActorSystem();
-        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
-        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
-        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
-        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
-        ShardStrategyFactory.setConfiguration(configuration);
-    }
-
-    private ActorSystem getSystem() {
-        return system;
-    }
-
-    private CreateTransaction eqCreateTransaction(final String memberName,
-            final TransactionType type) {
-        ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
-                    return obj.getTransactionId().startsWith(memberName) &&
-                            obj.getTransactionType() == type.ordinal();
-                }
-
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DataExists eqSerializedDataExists() {
-        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
-            @Override
-            public boolean matches(Object argument) {
-                return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DataExists eqDataExists() {
-        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
-            @Override
-            public boolean matches(Object argument) {
-                return (argument instanceof DataExists) &&
-                    ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private ReadData eqSerializedReadData() {
-        return eqSerializedReadData(TestModel.TEST_PATH);
-    }
-
-    private ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
-        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       ReadData.fromSerializable(argument).getPath().equals(path);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private ReadData eqReadData() {
-        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return (argument instanceof ReadData) &&
-                    ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
-                    WriteData obj = WriteData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
-                }
-
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
-                    MergeData obj = MergeData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
-                }
-
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
-        ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
-                       DeleteData.fromSerializable(argument).getPath().equals(expPath);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private Future<Object> readySerializedTxReply(String path) {
-        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
-    }
-
-    private Future<Object> readyTxReply(String path) {
-        return Futures.successful((Object)new ReadyTransactionReply(path));
-    }
-
-    private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
-            short transactionVersion) {
-        return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
-    }
-
-    private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
-        return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
-    }
-
-    private Future<Object> dataExistsSerializedReply(boolean exists) {
-        return Futures.successful(new DataExistsReply(exists).toSerializable());
-    }
-
-    private Future<DataExistsReply> dataExistsReply(boolean exists) {
-        return Futures.successful(new DataExistsReply(exists));
-    }
-
-    private Future<BatchedModificationsReply> batchedModificationsReply(int count) {
-        return Futures.successful(new BatchedModificationsReply(count));
-    }
-
-    private Future<Object> incompleteFuture(){
-        return mock(Future.class);
-    }
-
-    private ActorSelection actorSelection(ActorRef actorRef) {
-        return getSystem().actorSelection(actorRef.path());
-    }
-
-    private void expectBatchedModifications(ActorRef actorRef, int count) {
-        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
-    }
-
-    private void expectBatchedModifications(int count) {
-        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
-    }
-
-    private void expectIncompleteBatchedModifications() {
-        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
-    }
-
-    private void expectReadyTransaction(ActorRef actorRef) {
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-    }
-
-    private void expectFailedBatchedModifications(ActorRef actorRef) {
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
-    }
-
-    private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
-        return CreateTransactionReply.newBuilder()
-            .setTransactionActorPath(actorRef.path().toString())
-            .setTransactionId("txn-1")
-            .setMessageVersion(transactionVersion)
-            .build();
-    }
-
-    private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
-        ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-        doReturn(actorSystem.actorSelection(actorRef.path())).
-                when(mockActorContext).actorSelection(actorRef.path().toString());
-
-        doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
-                when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
-        return actorRef;
-    }
-
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
-            TransactionType type, int transactionVersion) {
-        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
-
-        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
-                executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
-                        eqCreateTransaction(memberName, type));
-
-        return actorRef;
-    }
-
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
-    }
-
-
-    private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
-            throws Throwable {
-
-        try {
-            future.checkedGet(5, TimeUnit.SECONDS);
-            fail("Expected ReadFailedException");
-        } catch(ReadFailedException e) {
-            throw e.getCause();
-        }
-    }
-
     @Test
     public void testRead() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
@@ -840,31 +519,6 @@ public class TransactionProxyTest {
                 BatchedModificationsReply.class);
     }
 
-    private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
-        Object... expReplies) throws Exception {
-        assertEquals("getReadyOperationFutures size", expReplies.length,
-                proxy.getCohortFutures().size());
-
-        int i = 0;
-        for( Future<ActorSelection> future: proxy.getCohortFutures()) {
-            assertNotNull("Ready operation Future is null", future);
-
-            Object expReply = expReplies[i++];
-            if(expReply instanceof ActorSelection) {
-                ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                assertEquals("Cohort actor path", expReply, actual);
-            } else {
-                // Expecting exception.
-                try {
-                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                    fail("Expected exception from ready operation Future");
-                } catch(Exception e) {
-                    // Expected
-                }
-            }
-        }
-    }
-
     @Test
     public void testReady() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
@@ -898,74 +552,6 @@ public class TransactionProxyTest {
                 isA(BatchedModifications.class));
     }
 
-    private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
-                READ_WRITE, version);
-
-        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
-
-        doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
-
-        doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
-
-        doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
-
-        expectReadyTransaction(actorRef);
-
-        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
-                eq(actorRef.path().toString()));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
-        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
-                get(5, TimeUnit.SECONDS);
-
-        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-        assertEquals("Response NormalizedNode", testNode, readOptional.get());
-
-        transactionProxy.write(TestModel.TEST_PATH, testNode);
-
-        transactionProxy.merge(TestModel.TEST_PATH, testNode);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class,
-                ShardTransactionMessages.DeleteDataReply.class);
-
-        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
-
-        return actorRef;
-    }
-
-    @Test
-    public void testCompatibilityWithBaseHeliumVersion() throws Exception {
-        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
-
-        verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
-                eq(actorRef.path().toString()));
-    }
-
-    @Test
-    public void testCompatibilityWithHeliumR1Version() throws Exception {
-        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
-
-        verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
-                eq(actorRef.path().toString()));
-    }
-
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
@@ -1777,49 +1363,4 @@ public class TransactionProxyTest {
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
     }
-
-    private List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
-        ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
-                ArgumentCaptor.forClass(BatchedModifications.class);
-        verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
-                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
-
-        List<BatchedModifications> batchedModifications = filterCaptured(
-                batchedModificationsCaptor, BatchedModifications.class);
-        return batchedModifications;
-    }
-
-    private <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
-        List<T> captured = new ArrayList<>();
-        for(T c: captor.getAllValues()) {
-            if(type.isInstance(c)) {
-                captured.add(c);
-            }
-        }
-
-        return captured;
-    }
-
-    private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), expected);
-    }
-
-    private void verifyBatchedModifications(Object message, Modification... expected) {
-        assertEquals("Message type", BatchedModifications.class, message.getClass());
-        BatchedModifications batchedModifications = (BatchedModifications)message;
-        assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
-        for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
-            Modification actual = batchedModifications.getModifications().get(i);
-            assertEquals("Modification type", expected[i].getClass(), actual.getClass());
-            assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
-                    ((AbstractModification)actual).getPath());
-            if(actual instanceof WriteModification) {
-                assertEquals("getData", ((WriteModification)expected[i]).getData(),
-                        ((WriteModification)actual).getData());
-            }
-        }
-    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
new file mode 100644 (file)
index 0000000..08c32c9
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+/**
+ * Unit tests for backwards compatibility with pre-Lithium versions.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest {
+
+    private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+        ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+            @Override
+            public boolean matches(Object argument) {
+                if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
+                    WriteData obj = WriteData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
+                }
+
+                return false;
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+        ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+            @Override
+            public boolean matches(Object argument) {
+                if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
+                    MergeData obj = MergeData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
+                }
+
+                return false;
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
+        ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
+                       DeleteData.fromSerializable(argument).getPath().equals(expPath);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
+
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(TestModel.TEST_PATH));
+
+        doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
+
+        doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
+
+        doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
+
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+                get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+        transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+        transactionProxy.merge(TestModel.TEST_PATH, testNode);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        return actorRef;
+    }
+
+    @Test
+    public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
+        verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
+    @Test
+    public void testCompatibilityWithHeliumR1Version() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+        verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+}