Handle 3PC message backwards compatibility 86/33386/4
authorTom Pantelis <tpanteli@brocade.com>
Fri, 22 Jan 2016 19:03:04 +0000 (14:03 -0500)
committerTony Tkacik <ttkacik@cisco.com>
Mon, 1 Feb 2016 16:52:52 +0000 (16:52 +0000)
Modified the ThreePhaseCommitCohortProxy to send the appropriate 3PC
messages based on the backend cohort actor's version. The version is
supplied via a new CohortInfo class which also holds the existing cohort
ActorSelection Future. The caller actually specifies a Supplier to
obtain the version b/c the version may not be known until the
ActorSelection Future is known. The TransactionProxy passes a Supplier
which obtains the version from the TransactionContext. As a result, the
ThreePhaseCommitCohortProxy was refactored a bit to handle the CohortInfo
instances.

The ThreePhaseCommitCohortProxyTest was refactored to use real actors for
the cohorts instead of mocking messages via a mocked ActorContext.

Modified the ShardCommitCoordinator to return the appropriate versioned
replies.

Change-Id: I13e6acf81c1fe8abda43c30b1a73648e411ab500
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronShardTest.java

index 97a0205ff2b7bc1a94bfe88c23e26a642b2a0866..7b25abbe836fd0cd655ad3c24d6d7b3f881bac45 100644 (file)
@@ -17,9 +17,16 @@ abstract class AbstractTransactionContext implements TransactionContext {
     private final TransactionIdentifier transactionIdentifier;
     private long modificationCount = 0;
     private boolean handOffComplete;
     private final TransactionIdentifier transactionIdentifier;
     private long modificationCount = 0;
     private boolean handOffComplete;
+    private final short transactionVersion;
 
     protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) {
 
     protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) {
+        this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier,
+            short transactionVersion) {
         this.transactionIdentifier = transactionIdentifier;
         this.transactionIdentifier = transactionIdentifier;
+        this.transactionVersion = transactionVersion;
     }
 
     /**
     }
 
     /**
@@ -52,4 +59,9 @@ abstract class AbstractTransactionContext implements TransactionContext {
     public boolean usesOperationLimiting() {
         return false;
     }
     public boolean usesOperationLimiting() {
         return false;
     }
+
+    @Override
+    public short getTransactionVersion() {
+        return transactionVersion;
+    }
 }
 }
index d0bef0000e7e1a72432f914c9e25d906196faea8..67916cf1d2c4871fd759c8281c3908f2af5d31e8 100644 (file)
@@ -36,7 +36,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
     private final ActorContext actorContext;
     private final ActorSelection actor;
 
     private final ActorContext actorContext;
     private final ActorSelection actor;
-    private final short remoteTransactionVersion;
     private final OperationLimiter limiter;
 
     private BatchedModifications batchedModifications;
     private final OperationLimiter limiter;
 
     private BatchedModifications batchedModifications;
@@ -44,11 +43,10 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
     protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
             ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
 
     protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
             ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
-        super(identifier);
+        super(identifier, remoteTransactionVersion);
         this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
         this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
-        this.remoteTransactionVersion = remoteTransactionVersion;
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
@@ -64,10 +62,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return actorContext;
     }
 
         return actorContext;
     }
 
-    protected short getRemoteTransactionVersion() {
-        return remoteTransactionVersion;
-    }
-
     protected Future<Object> executeOperationAsync(SerializableMessage msg) {
         return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable()));
     }
     protected Future<Object> executeOperationAsync(SerializableMessage msg) {
         return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable()));
     }
@@ -77,7 +71,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         LOG.debug("Tx {} closeTransaction called", getIdentifier());
         TransactionContextCleanup.untrack(this);
 
         LOG.debug("Tx {} closeTransaction called", getIdentifier());
         TransactionContextCleanup.untrack(this);
 
-        actorContext.sendOperationAsync(getActor(), new CloseTransaction(remoteTransactionVersion).toSerializable());
+        actorContext.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable());
     }
 
     @Override
     }
 
     @Override
@@ -115,7 +109,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     private BatchedModifications newBatchedModifications() {
     }
 
     private BatchedModifications newBatchedModifications() {
-        return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, getIdentifier().getChainId());
+        return new BatchedModifications(getIdentifier().toString(), getTransactionVersion(),
+                getIdentifier().getChainId());
     }
 
     private void batchModification(Modification modification) {
     }
 
     private void batchModification(Modification modification) {
@@ -206,7 +201,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             }
         };
 
             }
         };
 
-        Future<Object> future = executeOperationAsync(readCmd.asVersion(remoteTransactionVersion));
+        Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()));
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
index 18be295dbba105783fe398def057036c92659451..74e25323bc711434dac93efe71594defc761f3f0 100644 (file)
@@ -347,7 +347,7 @@ public class Shard extends RaftActor {
         try {
             cohortEntry.commit();
 
         try {
             cohortEntry.commit();
 
-            sender.tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).toSerializable(), getSelf());
+            sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -384,7 +384,8 @@ public class Shard extends RaftActor {
                     LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
                 }
 
                     LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
                 }
 
-                sender.tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).toSerializable(), getSelf());
+                sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
+                        getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
index 0c6012b9a2b807b18f6afe3af6ff5693ba684dbc..51d8d5caec18e0c94a22520fb23d1b9708acdbf4 100644 (file)
@@ -135,7 +135,7 @@ class ShardCommitCoordinator {
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
         ShardDataTreeCohort cohort = ready.getTransaction().ready();
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
         ShardDataTreeCohort cohort = ready.getTransaction().ready();
-        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
+        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
         cohortCache.put(ready.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
         cohortCache.put(ready.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -169,7 +169,7 @@ class ShardCommitCoordinator {
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
-                        batched.getTransactionChainID()));
+                        batched.getTransactionChainID()), batched.getVersion());
             cohortCache.put(batched.getTransactionID(), cohortEntry);
         }
 
             cohortCache.put(batched.getTransactionID(), cohortEntry);
         }
 
@@ -227,7 +227,8 @@ class ShardCommitCoordinator {
     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
                 message.getTransactionID());
     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
                 message.getTransactionID());
-        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+                DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
@@ -321,8 +322,9 @@ class ShardCommitCoordinator {
             } else {
                 // FIXME - use caller's version
                 cohortEntry.getReplySender().tell(
             } else {
                 // FIXME - use caller's version
                 cohortEntry.getReplySender().tell(
-                        canCommit ? CanCommitTransactionReply.yes(DataStoreVersions.CURRENT_VERSION).toSerializable() :
-                            CanCommitTransactionReply.no(DataStoreVersions.CURRENT_VERSION).toSerializable(), cohortEntry.getShard().self());
+                        canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
+                            CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
+                        cohortEntry.getShard().self());
             }
         } catch (Exception e) {
             log.debug("{}: An exception occurred during canCommit", name, e);
             }
         } catch (Exception e) {
             log.debug("{}: An exception occurred during canCommit", name, e);
@@ -421,7 +423,7 @@ class ShardCommitCoordinator {
             shard.getShardMBean().incrementAbortTransactionsCount();
 
             if(sender != null) {
             shard.getShardMBean().incrementAbortTransactionsCount();
 
             if(sender != null) {
-                sender.tell(new AbortTransactionReply().toSerializable(), self);
+                sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
             }
         } catch (Exception e) {
             log.error("{}: An exception happened during abort", name, e);
             }
         } catch (Exception e) {
             log.error("{}: An exception happened during abort", name, e);
@@ -587,16 +589,19 @@ class ShardCommitCoordinator {
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
         private int totalBatchedModificationsReceived;
         private boolean aborted;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
         private int totalBatchedModificationsReceived;
         private boolean aborted;
+        private final short clientVersion;
 
 
-        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
+        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
             this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
             this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
+            this.clientVersion = clientVersion;
         }
 
         }
 
-        CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
+        CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
             this.transactionID = transactionID;
             this.cohort = cohort;
             this.transaction = null;
             this.transactionID = transactionID;
             this.cohort = cohort;
             this.transaction = null;
+            this.clientVersion = clientVersion;
         }
 
         void updateLastAccessTime() {
         }
 
         void updateLastAccessTime() {
@@ -608,6 +613,10 @@ class ShardCommitCoordinator {
             return transactionID;
         }
 
             return transactionID;
         }
 
+        short getClientVersion() {
+            return clientVersion;
+        }
+
         DataTreeCandidate getCandidate() {
             return cohort.getCandidate();
         }
         DataTreeCandidate getCandidate() {
             return cohort.getCandidate();
         }
index 0823c902ae4947ede13b3807a70a04cc46546704..9c17bc1a476c3c95060498656df9061a8312bcab 100644 (file)
@@ -21,8 +21,7 @@ import scala.concurrent.Future;
 /**
  * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
  * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
 /**
  * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
  * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
- * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a
- * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions).
+ * shard as an optimization.
  *
  * @author Thomas Pantelis
  */
  *
  * @author Thomas Pantelis
  */
index 1e59f0d0970cd247ce4fa4b5c62c9ad1c2d0323d..9f94731b59a65370e0aa9bf7784e202aab9e992d 100644 (file)
@@ -9,14 +9,18 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -27,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
-import scala.runtime.AbstractFunction1;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -36,42 +39,82 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
+    private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
+        @Override
+        public Object newMessage(String transactionId, short version) {
+            return new CommitTransaction(transactionId, version).toSerializable();
+        }
+
+        @Override
+        public boolean isSerializedReplyType(Object reply) {
+            return CommitTransactionReply.isSerializedType(reply);
+        }
+    };
+
+    private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
+        @Override
+        public Object newMessage(String transactionId, short version) {
+            return new AbortTransaction(transactionId, version).toSerializable();
+        }
+
+        @Override
+        public boolean isSerializedReplyType(Object reply) {
+            return AbortTransactionReply.isSerializedType(reply);
+        }
+    };
+
     private final ActorContext actorContext;
     private final ActorContext actorContext;
-    private final List<Future<ActorSelection>> cohortFutures;
-    private volatile List<ActorSelection> cohorts;
+    private final List<CohortInfo> cohorts;
+    private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
     private final String transactionId;
     private volatile OperationCallback commitOperationCallback;
 
     private final String transactionId;
     private volatile OperationCallback commitOperationCallback;
 
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
-            List<Future<ActorSelection>> cohortFutures, String transactionId) {
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts, String transactionId) {
         this.actorContext = actorContext;
         this.actorContext = actorContext;
-        this.cohortFutures = cohortFutures;
+        this.cohorts = cohorts;
         this.transactionId = transactionId;
         this.transactionId = transactionId;
-    }
 
 
-    private Future<Void> buildCohortList() {
+        if(cohorts.isEmpty()) {
+            cohortsResolvedFuture.set(null);
+        }
+    }
 
 
-        Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
-                actorContext.getClientDispatcher());
+    private ListenableFuture<Void> resolveCohorts() {
+        if(cohortsResolvedFuture.isDone()) {
+            return cohortsResolvedFuture;
+        }
 
 
-        return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
-            @Override
-            public Void apply(Iterable<ActorSelection> actorSelections) {
-                cohorts = Lists.newArrayList(actorSelections);
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Tx {} successfully built cohort path list: {}",
-                        transactionId, cohorts);
+        final AtomicInteger completed = new AtomicInteger(cohorts.size());
+        for(final CohortInfo info: cohorts) {
+            info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
+                @Override
+                public void onComplete(Throwable failure, ActorSelection actor)  {
+                    synchronized(completed) {
+                        boolean done = completed.decrementAndGet() == 0;
+                        if(failure != null) {
+                            LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
+                            cohortsResolvedFuture.setException(failure);
+                        } else if(!cohortsResolvedFuture.isDone()) {
+                            LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
+
+                            info.setResolvedActor(actor);
+                            if(done) {
+                                LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
+                                cohortsResolvedFuture.set(null);
+                            }
+                        }
+                    }
                 }
                 }
-                return null;
-            }
-        }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+            }, actorContext.getClientDispatcher());
+        }
+
+        return cohortsResolvedFuture;
     }
 
     @Override
     public ListenableFuture<Boolean> canCommit() {
     }
 
     @Override
     public ListenableFuture<Boolean> canCommit() {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Tx {} canCommit", transactionId);
-        }
+        LOG.debug("Tx {} canCommit", transactionId);
+
         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
         // The first phase of canCommit is to gather the list of cohort actor paths that will
         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
         // The first phase of canCommit is to gather the list of cohort actor paths that will
@@ -80,53 +123,42 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
 
         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
 
-        buildCohortList().onComplete(new OnComplete<Void>() {
+        Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
             @Override
             @Override
-            public void onComplete(Throwable failure, Void notUsed) throws Throwable {
-                if(failure != null) {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
-                    }
-                    returnFuture.setException(failure);
-                } else {
-                    finishCanCommit(returnFuture);
-                }
+            public void onSuccess(Void notUsed) {
+                finishCanCommit(returnFuture);
             }
             }
-        }, actorContext.getClientDispatcher());
+
+            @Override
+            public void onFailure(Throwable failure) {
+                returnFuture.setException(failure);
+            }
+        });
 
         return returnFuture;
     }
 
     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
 
         return returnFuture;
     }
 
     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Tx {} finishCanCommit", transactionId);
-        }
+        LOG.debug("Tx {} finishCanCommit", transactionId);
 
         // For empty transactions return immediately
         if(cohorts.size() == 0){
 
         // For empty transactions return immediately
         if(cohorts.size() == 0){
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
-            }
+            LOG.debug("Tx {}: canCommit returning result true", transactionId);
             returnFuture.set(Boolean.TRUE);
             return;
         }
 
             returnFuture.set(Boolean.TRUE);
             return;
         }
 
-        commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
-            new TransactionRateLimitingCallback(actorContext);
-
+        commitOperationCallback = new TransactionRateLimitingCallback(actorContext);
         commitOperationCallback.run();
 
         commitOperationCallback.run();
 
-        final Object message = new CanCommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable();
-
-        final Iterator<ActorSelection> iterator = cohorts.iterator();
+        final Iterator<CohortInfo> iterator = cohorts.iterator();
 
         final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
 
         final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) throws Throwable {
+            public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
                 if (failure != null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
-                    }
+                    LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
+
                     returnFuture.setException(failure);
                     commitOperationCallback.failure();
                     return;
                     returnFuture.setException(failure);
                     commitOperationCallback.failure();
                     return;
@@ -138,8 +170,10 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
 
                 boolean result = true;
                 if (CanCommitTransactionReply.isSerializedType(response)) {
 
                 boolean result = true;
                 if (CanCommitTransactionReply.isSerializedType(response)) {
-                    CanCommitTransactionReply reply =
-                            CanCommitTransactionReply.fromSerializable(response);
+                    CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
+
+                    LOG.debug("Tx {}: received {}", transactionId, response);
+
                     if (!reply.getCanCommit()) {
                         result = false;
                     }
                     if (!reply.getCanCommit()) {
                         result = false;
                     }
@@ -150,35 +184,45 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                     return;
                 }
 
                     return;
                 }
 
-                if(iterator.hasNext() && result){
-                    Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
-                            actorContext.getTransactionCommitOperationTimeout());
-                    future.onComplete(this, actorContext.getClientDispatcher());
+                if(iterator.hasNext() && result) {
+                    sendCanCommitTransaction(iterator.next(), this);
                 } else {
                 } else {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
-                    }
+                    LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
                     returnFuture.set(Boolean.valueOf(result));
                 }
 
             }
         };
 
                     returnFuture.set(Boolean.valueOf(result));
                 }
 
             }
         };
 
-        Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
-                actorContext.getTransactionCommitOperationTimeout());
+        sendCanCommitTransaction(iterator.next(), onComplete);
+    }
+
+    private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
+        CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
+        }
+
+        Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
+                message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
-    private Future<Iterable<Object>> invokeCohorts(Object message) {
+    private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
-        for(ActorSelection cohort : cohorts) {
+        for(CohortInfo cohort : cohorts) {
+            Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
+
             if(LOG.isDebugEnabled()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
+                LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
             }
             }
-            futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
+
+            futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
+                    actorContext.getTransactionCommitOperationTimeout()));
         }
 
         }
 
-        return Futures.sequence(futureList, actorContext.getClientDispatcher());
+        return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher());
     }
 
     @Override
     }
 
     @Override
@@ -196,8 +240,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         // exception then that exception will supersede and suppress the original exception. But
         // it's the original exception that is the root cause and of more interest to the client.
 
         // exception then that exception will supersede and suppress the original exception. But
         // it's the original exception that is the root cause and of more interest to the client.
 
-        return voidOperation("abort", new AbortTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(),
-                AbortTransactionReply.class, false);
+        return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
+                AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
     }
 
     @Override
     }
 
     @Override
@@ -205,88 +249,86 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
             OperationCallback.NO_OP_CALLBACK;
 
         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
             OperationCallback.NO_OP_CALLBACK;
 
-        return voidOperation("commit", new CommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(),
+        return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
                 CommitTransactionReply.class, true, operationCallback);
     }
 
                 CommitTransactionReply.class, true, operationCallback);
     }
 
-    private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
-                                                 final Class<?> expectedResponseClass, final boolean propagateException) {
-        return voidOperation(operationName, message, expectedResponseClass, propagateException,
-                OperationCallback.NO_OP_CALLBACK);
+    private boolean successfulFuture(ListenableFuture<Void> future) {
+        if(!future.isDone()) {
+            return false;
+        }
+
+        try {
+            future.get();
+            return true;
+        } catch(Exception e) {
+            return false;
+        }
     }
 
     }
 
-    private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
-                                                 final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
+    private ListenableFuture<Void> voidOperation(final String operationName,
+            final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
+            final boolean propagateException, final OperationCallback callback) {
+        LOG.debug("Tx {} {}", transactionId, operationName);
 
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Tx {} {}", transactionId, operationName);
-        }
         final SettableFuture<Void> returnFuture = SettableFuture.create();
 
         // The cohort actor list should already be built at this point by the canCommit phase but,
         // if not for some reason, we'll try to build it here.
 
         final SettableFuture<Void> returnFuture = SettableFuture.create();
 
         // The cohort actor list should already be built at this point by the canCommit phase but,
         // if not for some reason, we'll try to build it here.
 
-        if(cohorts != null) {
-            finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
+        ListenableFuture<Void> future = resolveCohorts();
+        if(successfulFuture(future)) {
+            finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
                     returnFuture, callback);
         } else {
                     returnFuture, callback);
         } else {
-            buildCohortList().onComplete(new OnComplete<Void>() {
+            Futures.addCallback(future, new FutureCallback<Void>() {
                 @Override
                 @Override
-                public void onComplete(Throwable failure, Void notUsed) throws Throwable {
-                    if(failure != null) {
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
-                                operationName, failure);
-                        }
-                        if(propagateException) {
-                            returnFuture.setException(failure);
-                        } else {
-                            returnFuture.set(null);
-                        }
+                public void onSuccess(Void notUsed) {
+                    finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
+                            propagateException, returnFuture, callback);
+                }
+
+                @Override
+                public void onFailure(Throwable failure) {
+                    LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
+
+                    if(propagateException) {
+                        returnFuture.setException(failure);
                     } else {
                     } else {
-                        finishVoidOperation(operationName, message, expectedResponseClass,
-                                propagateException, returnFuture, callback);
+                        returnFuture.set(null);
                     }
                 }
                     }
                 }
-            }, actorContext.getClientDispatcher());
+            });
         }
 
         return returnFuture;
     }
 
         }
 
         return returnFuture;
     }
 
-    private void finishVoidOperation(final String operationName, final Object message,
+    private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier,
                                      final Class<?> expectedResponseClass, final boolean propagateException,
                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
                                      final Class<?> expectedResponseClass, final boolean propagateException,
                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Tx {} finish {}", transactionId, operationName);
-        }
+        LOG.debug("Tx {} finish {}", transactionId, operationName);
 
         callback.resume();
 
 
         callback.resume();
 
-        Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+        Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
 
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
 
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
-
                 Throwable exceptionToPropagate = failure;
                 if(exceptionToPropagate == null) {
                     for(Object response: responses) {
                         if(!response.getClass().equals(expectedResponseClass)) {
                             exceptionToPropagate = new IllegalArgumentException(
                 Throwable exceptionToPropagate = failure;
                 if(exceptionToPropagate == null) {
                     for(Object response: responses) {
                         if(!response.getClass().equals(expectedResponseClass)) {
                             exceptionToPropagate = new IllegalArgumentException(
-                                    String.format("Unexpected response type %s",
-                                            response.getClass()));
+                                    String.format("Unexpected response type %s", response.getClass()));
                             break;
                         }
                     }
                 }
 
                 if(exceptionToPropagate != null) {
                             break;
                         }
                     }
                 }
 
                 if(exceptionToPropagate != null) {
-
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
-                            operationName, exceptionToPropagate);
-                    }
+                    LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
                     if(propagateException) {
                         // We don't log the exception here to avoid redundant logging since we're
                         // propagating to the caller in MD-SAL core who will log it.
                     if(propagateException) {
                         // We don't log the exception here to avoid redundant logging since we're
                         // propagating to the caller in MD-SAL core who will log it.
@@ -295,19 +337,13 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                         // Since the caller doesn't want us to propagate the exception we'll also
                         // not log it normally. But it's usually not good to totally silence
                         // exceptions so we'll log it to debug level.
                         // Since the caller doesn't want us to propagate the exception we'll also
                         // not log it normally. But it's usually not good to totally silence
                         // exceptions so we'll log it to debug level.
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
-                                exceptionToPropagate);
-                        }
                         returnFuture.set(null);
                     }
 
                     callback.failure();
                 } else {
                         returnFuture.set(null);
                     }
 
                     callback.failure();
                 } else {
+                    LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
 
 
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
-                    }
                     returnFuture.set(null);
 
                     callback.success();
                     returnFuture.set(null);
 
                     callback.success();
@@ -318,6 +354,45 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
 
     @Override
     List<Future<ActorSelection>> getCohortFutures() {
 
     @Override
     List<Future<ActorSelection>> getCohortFutures() {
-        return Collections.unmodifiableList(cohortFutures);
+        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
+        for(CohortInfo info: cohorts) {
+            cohortFutures.add(info.getActorFuture());
+        }
+
+        return cohortFutures;
+    }
+
+    static class CohortInfo {
+        private final Future<ActorSelection> actorFuture;
+        private volatile ActorSelection resolvedActor;
+        private final Supplier<Short> actorVersionSupplier;
+
+        CohortInfo(Future<ActorSelection> actorFuture, Supplier<Short> actorVersionSupplier) {
+            this.actorFuture = actorFuture;
+            this.actorVersionSupplier = actorVersionSupplier;
+        }
+
+        Future<ActorSelection> getActorFuture() {
+            return actorFuture;
+        }
+
+        ActorSelection getResolvedActor() {
+            return resolvedActor;
+        }
+
+        void setResolvedActor(ActorSelection resolvedActor) {
+            this.resolvedActor = resolvedActor;
+        }
+
+        short getActorVersion() {
+            Preconditions.checkState(resolvedActor != null,
+                    "getActorVersion cannot be called until the actor is resolved");
+            return actorVersionSupplier.get();
+        }
+    }
+
+    private interface MessageSupplier {
+        Object newMessage(String transactionId, short version);
+        boolean isSerializedReplyType(Object reply);
     }
 }
     }
 }
index ab636ff493c724c3daa4f69eea6d5fb7d591847c..ca03c3d60df7632792a99f15c20cb2c84eb0f35b 100644 (file)
@@ -45,4 +45,6 @@ interface TransactionContext {
      * @return
      */
     boolean usesOperationLimiting();
      * @return
      */
     boolean usesOperationLimiting();
+
+    short getTransactionVersion();
 }
 }
index f645608dd9b96c327153c804f544a0b2eacb16b3..5dcba758f6a1129fc51581a035728abb64503124 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
@@ -237,6 +238,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
     }
 
         return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
             final TransactionContextWrapper contextWrapper) {
 
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
             final TransactionContextWrapper contextWrapper) {
 
@@ -281,14 +283,27 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
             final Set<Entry<String, TransactionContextWrapper>> txContextWrapperEntries) {
 
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
             final Set<Entry<String, TransactionContextWrapper>> txContextWrapperEntries) {
 
-        final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextWrapperEntries.size());
+        final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrapperEntries.size());
         for (Entry<String, TransactionContextWrapper> e : txContextWrapperEntries) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
         for (Entry<String, TransactionContextWrapper> e : txContextWrapperEntries) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
-            cohortFutures.add(e.getValue().readyTransaction());
+            final TransactionContextWrapper wrapper = e.getValue();
+
+            // The remote tx version is obtained the via TransactionContext which may not be available yet so
+            // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
+            // TransactionContext is available.
+            Supplier<Short> txVersionSupplier = new Supplier<Short>() {
+                @Override
+                public Short get() {
+                    return wrapper.getTransactionContext().getTransactionVersion();
+                }
+            };
+
+            cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier));
         }
 
         }
 
-        return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
+        return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts,
+                getIdentifier().toString());
     }
 
     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
     }
 
     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
index 2442548f13551cec05ad09d90298716cd73fcda3..17dec1911eb3a531ca69800cc924107c35518cac 100644 (file)
@@ -25,6 +25,7 @@ import akka.japi.Creator;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collections;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collections;
@@ -274,6 +275,15 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         }
     }
 
         }
     }
 
+    protected ShardDataTreeCohort mockShardDataTreeCohort() {
+        ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class);
+        doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+        doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+        doReturn(mockCandidate("candidate")).when(cohort).getCandidate();
+        return cohort;
+    }
+
     static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) {
         ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
         doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
     static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) {
         ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
         doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
@@ -420,7 +430,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
     }
 
         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
     }
 
-    static DataTreeCandidateTip mockCandidate(final String name) {
+    public static DataTreeCandidateTip mockCandidate(final String name) {
         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
index 499d5e22b99463245747d679d4244d1a4725c0b3..55024b265aea8cb9b9d29d6a667207b51f669e90 100644 (file)
@@ -26,6 +26,7 @@ import akka.pattern.AskTimeoutException;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -539,6 +540,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
     }
 
         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
@@ -592,8 +594,11 @@ public class DistributedDataStoreRemotingIntegrationTest {
         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
+        Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
+        Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-                leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+                leaderDistributedDataStore.getActorContext(), Arrays.asList(
+                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
@@ -601,6 +606,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
     @Test
     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
@@ -656,8 +662,11 @@ public class DistributedDataStoreRemotingIntegrationTest {
         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
+        Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
+        Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-                leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+                leaderDistributedDataStore.getActorContext(), Arrays.asList(
+                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
index 0ab92dda893c99e52fe548079f37e89fa6ba6f1d..e5db5cbbaf50950c69063d3f43bb8ef3894e87bd 100644 (file)
@@ -9,41 +9,46 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.Dispatchers;
 import akka.dispatch.Futures;
 import akka.dispatch.Futures;
-import akka.util.Timeout;
+import akka.testkit.TestActorRef;
 import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
-import com.google.common.collect.Lists;
+import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.mockito.stubbing.Stubber;
+import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
@@ -51,12 +56,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     static class TestException extends RuntimeException {
     }
 
     static class TestException extends RuntimeException {
     }
 
-    @Mock
     private ActorContext actorContext;
 
     private ActorContext actorContext;
 
-    @Mock
-    private DatastoreContext datastoreContext;
-
     @Mock
     private Timer commitTimer;
 
     @Mock
     private Timer commitTimer;
 
@@ -66,15 +67,27 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Mock
     private Snapshot commitSnapshot;
 
     @Mock
     private Snapshot commitSnapshot;
 
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+    private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
-        doReturn(getSystem()).when(actorContext).getActorSystem();
-        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
-        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
-        doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
-        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
+                new MockClusterWrapper(), new MockConfiguration(),
+                DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) {
+                    @Override
+                    public Timer getOperationTimer(String operationName) {
+                        return commitTimer;
+                    }
+
+                    @Override
+                    public double getTxCreationLimit() {
+                        return 10.0;
+                    }
+                };
+
         doReturn(commitTimerContext).when(commitTimer).time();
         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
         for(int i=1;i<11;i++){
         doReturn(commitTimerContext).when(commitTimer).time();
         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
         for(int i=1;i<11;i++){
@@ -82,274 +95,353 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
         }
             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
         }
-        doReturn(10.0).when(actorContext).getTxCreationLimit();
-    }
-
-    private Future<ActorSelection> newCohort() {
-        ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
-        ActorSelection actorSelection = getSystem().actorSelection(path);
-        return Futures.successful(actorSelection);
-    }
-
-    private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-        for(int i = 1; i <= nCohorts; i++) {
-            cohortFutures.add(newCohort());
-        }
-
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
     }
 
     }
 
-    private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
-            throws Exception {
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-        cohortFutures.add(newCohort());
-        cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
-
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
-    }
-
-    private void setupMockActorContext(Class<?> requestType, Object... responses) {
-        Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
-                .failed((Throwable) responses[0]) : Futures
-                .successful(((SerializableMessage) responses[0]).toSerializable()));
-
-        for(int i = 1; i < responses.length; i++) {
-            stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
-                    .failed((Throwable) responses[i]) : Futures
-                    .successful(((SerializableMessage) responses[i]).toSerializable()));
-        }
-
-        stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
-                isA(requestType), any(Timeout.class));
-
-        doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
-                .when(actorContext).getTransactionCommitOperationTimeout();
-    }
-
-    private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
-        verify(actorContext, times(nCohorts)).executeOperationAsync(
-                any(ActorSelection.class), isA(requestType), any(Timeout.class));
-    }
-
-    private static void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+    @Test
+    public void testCanCommitYesWithOneCohort() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
 
 
-        try {
-            future.get(5, TimeUnit.SECONDS);
-            fail("Expected ExecutionException");
-        } catch(ExecutionException e) {
-            throw e.getCause();
-        }
+        verifyCanCommit(proxy.canCommit(), true);
+        verifyCohortActors();
     }
 
     @Test
     }
 
     @Test
-    public void testCanCommitWithOneCohort() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
-        setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.yes(CURRENT_VERSION));
-
-        ListenableFuture<Boolean> future = proxy.canCommit();
-
-        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
+    public void testCanCommitNoWithOneCohort() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
 
 
-        setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.no(CURRENT_VERSION));
-
-        future = proxy.canCommit();
-
-        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
-
-        verifyCohortInvocations(2, CanCommitTransaction.class);
+        verifyCanCommit(proxy.canCommit(), false);
+        verifyCohortActors();
     }
 
     @Test
     }
 
     @Test
-    public void testCanCommitWithMultipleCohorts() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(CanCommitTransaction.class,
-                CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
-
-        ListenableFuture<Boolean> future = proxy.canCommit();
-
-        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
-
-        verifyCohortInvocations(2, CanCommitTransaction.class);
+    public void testCanCommitYesWithTwoCohorts() throws Exception {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION))));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifyCohortActors();
     }
 
     @Test
     }
 
     @Test
-    public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(3);
-
-        setupMockActorContext(CanCommitTransaction.class,
-                CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.no(CURRENT_VERSION),
-                CanCommitTransactionReply.yes(CURRENT_VERSION));
-
-        ListenableFuture<Boolean> future = proxy.canCommit();
-
-        Boolean actual = future.get(5, TimeUnit.SECONDS);
-
-        assertEquals("canCommit", false, actual);
-
-        verifyCohortInvocations(2, CanCommitTransaction.class);
+    public void testCanCommitNoWithThreeCohorts() throws Exception {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                        CanCommitTransactionReply.no(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1")));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), false);
+        verifyCohortActors();
     }
 
     @Test(expected = TestException.class)
     public void testCanCommitWithExceptionFailure() throws Throwable {
     }
 
     @Test(expected = TestException.class)
     public void testCanCommitWithExceptionFailure() throws Throwable {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
 
 
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+        propagateExecutionExceptionCause(proxy.canCommit());
+    }
 
 
-        setupMockActorContext(CanCommitTransaction.class, new TestException());
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanCommitWithInvalidResponseType() throws Throwable {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
 
         propagateExecutionExceptionCause(proxy.canCommit());
     }
 
 
         propagateExecutionExceptionCause(proxy.canCommit());
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCanCommitWithInvalidResponseType() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+    @Test(expected = TestException.class)
+    public void testCanCommitWithFailedCohortFuture() throws Throwable {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1")),
+                newCohortInfoWithFailedFuture(new TestException()),
+                newCohortInfo(new CohortActor.Builder("txn-1")));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
 
 
-        setupMockActorContext(CanCommitTransaction.class,
-                new CommitTransactionReply());
+        propagateExecutionExceptionCause(proxy.canCommit());
+    }
 
 
-        proxy.canCommit().get(5, TimeUnit.SECONDS);
+    @Test
+    public void testAllThreePhasesSuccessful() throws Exception {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        verifySuccessfulFuture(proxy.commit());
+        verifyCohortActors();
     }
 
     @Test(expected = TestException.class)
     }
 
     @Test(expected = TestException.class)
-    public void testCanCommitWithFailedCohortPath() throws Throwable {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
-
-        try {
-            propagateExecutionExceptionCause(proxy.canCommit());
-        } finally {
-            verifyCohortInvocations(0, CanCommitTransaction.class);
-        }
+    public void testCommitWithExceptionFailure() throws Throwable {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit(new TestException())));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        propagateExecutionExceptionCause(proxy.commit());
     }
 
     }
 
-    @Test
-    public void testPreCommit() throws Exception {
-        // Precommit is currently a no-op
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+    @Test(expected = IllegalArgumentException.class)
+    public void testCommitWithInvalidResponseType() throws Throwable {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+                        expectCommit("invalid"))), "txn-1");
 
 
-        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        propagateExecutionExceptionCause(proxy.commit());
     }
 
     @Test
     public void testAbort() throws Exception {
     }
 
     @Test
     public void testAbort() throws Exception {
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
-        setupMockActorContext(AbortTransaction.class, new AbortTransactionReply());
-
-        proxy.abort().get(5, TimeUnit.SECONDS);
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
+                        AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
 
 
-        verifyCohortInvocations(1, AbortTransaction.class);
+        verifySuccessfulFuture(proxy.abort());
+        verifyCohortActors();
     }
 
     @Test
     public void testAbortWithFailure() throws Exception {
     }
 
     @Test
     public void testAbortWithFailure() throws Exception {
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
-        setupMockActorContext(AbortTransaction.class, new RuntimeException("mock"));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
 
         // The exception should not get propagated.
 
         // The exception should not get propagated.
-        proxy.abort().get(5, TimeUnit.SECONDS);
-
-        verifyCohortInvocations(1, AbortTransaction.class);
+        verifySuccessfulFuture(proxy.abort());
+        verifyCohortActors();
     }
 
     @Test
     }
 
     @Test
-    public void testAbortWithFailedCohortPath() throws Throwable {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
-
-        // The exception should not get propagated.
-        proxy.abort().get(5, TimeUnit.SECONDS);
-
-        verifyCohortInvocations(0, AbortTransaction.class);
+    public void testAbortWithFailedCohortFuture() throws Throwable {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfoWithFailedFuture(new TestException()),
+                newCohortInfo(new CohortActor.Builder("txn-1")));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifySuccessfulFuture(proxy.abort());
+        verifyCohortActors();
     }
 
     @Test
     }
 
     @Test
-    public void testCommit() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
-                new CommitTransactionReply());
-
-        proxy.commit().get(5, TimeUnit.SECONDS);
-
-        verifyCohortInvocations(2, CommitTransaction.class);
+    public void testWithNoCohorts() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
+                Collections.<CohortInfo>emptyList(), "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        verifySuccessfulFuture(proxy.commit());
+        verifyCohortActors();
     }
 
     }
 
-    @Test(expected = TestException.class)
-    public void testCommitWithFailure() throws Throwable {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
-                new TestException());
-
-        propagateExecutionExceptionCause(proxy.commit());
+    @Test
+    public void testBackwardsCompatibilityWithPreBoron() throws Exception {
+        List<CohortInfo> cohorts = Arrays.asList(
+                newCohortInfo(new CohortActor.Builder("txn-1").
+                        expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class,
+                                CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)).
+                        expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class,
+                                CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)),
+                        DataStoreVersions.LITHIUM_VERSION));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+        verifyCanCommit(proxy.canCommit(), true);
+        verifySuccessfulFuture(proxy.preCommit());
+        verifySuccessfulFuture(proxy.commit());
+        verifyCohortActors();
     }
 
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCommitWithInvalidResponseType() throws Exception {
+    private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
 
 
-        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Expected ExecutionException");
+        } catch(ExecutionException e) {
+            verifyCohortActors();
+            throw e.getCause();
+        }
+    }
 
 
-        setupMockActorContext(CommitTransaction.class, new AbortTransactionReply());
+    private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
+        TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props().
+                withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
+        cohortActors.add(actor);
+        return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier<Short>() {
+            @Override
+            public Short get() {
+                return version;
+            }
+        });
+    }
 
 
-        proxy.commit().get(5, TimeUnit.SECONDS);
+    private CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
+        return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
+            @Override
+            public Short get() {
+                return CURRENT_VERSION;
+            }
+        });
     }
 
     }
 
-    @Test(expected = TestException.class)
-    public void testCommitWithFailedCohortPath() throws Throwable {
+    private CohortInfo newCohortInfo(CohortActor.Builder builder) {
+        return newCohortInfo(builder, CURRENT_VERSION);
+    }
 
 
-        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+    private void verifyCohortActors() {
+        for(TestActorRef<CohortActor> actor: cohortActors) {
+            actor.underlyingActor().verify();
+        }
+    }
 
 
+    private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
         try {
         try {
-            propagateExecutionExceptionCause(proxy.commit());
-        } finally {
-
-            verifyCohortInvocations(0, CommitTransaction.class);
+            return future.get(5, TimeUnit.SECONDS);
+        } catch(Exception e) {
+            verifyCohortActors();
+            throw e;
         }
         }
-
     }
 
     }
 
-    @Test
-    public void testAllThreePhasesSuccessful() throws Exception {
-
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(CanCommitTransaction.class,
-                CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
-
-        setupMockActorContext(CommitTransaction.class,
-                new CommitTransactionReply(), new CommitTransactionReply());
+    private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
+        Boolean actual = verifySuccessfulFuture(future);
+        assertEquals("canCommit", expected, actual);
+    }
 
 
-        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+    private static class CohortActor extends UntypedActor {
+        private final Builder builder;
+        private final AtomicInteger canCommitCount = new AtomicInteger();
+        private final AtomicInteger commitCount = new AtomicInteger();
+        private final AtomicInteger abortCount = new AtomicInteger();
+        private volatile AssertionError assertionError;
 
 
-        proxy.canCommit().get(5, TimeUnit.SECONDS);
-        proxy.preCommit().get(5, TimeUnit.SECONDS);
-        proxy.commit().get(5, TimeUnit.SECONDS);
+        private CohortActor(Builder builder) {
+            this.builder = builder;
+        }
 
 
-        verifyCohortInvocations(2, CanCommitTransaction.class);
-        verifyCohortInvocations(2, CommitTransaction.class);
+        @Override
+        public void onReceive(Object message) {
+            if(CanCommitTransaction.isSerializedType(message)) {
+                onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
+                        builder.expCanCommitType, builder.canCommitReply);
+                canCommitCount.incrementAndGet();
+            } else if(CommitTransaction.isSerializedType(message)) {
+                onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
+                        builder.expCommitType, builder.commitReply);
+                commitCount.incrementAndGet();
+            } else if(AbortTransaction.isSerializedType(message)) {
+                onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
+                        builder.expAbortType, builder.abortReply);
+                abortCount.incrementAndGet();
+            } else {
+                assertionError = new AssertionError("Unexpected message " + message);
+            }
+        }
 
 
-    }
+        private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
+                Class<?> expType, Object reply) {
+            try {
+                assertNotNull("Unexpected " + name, expType);
+                assertEquals(name + " type", expType, rawMessage.getClass());
+                assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID());
+
+                if(reply instanceof Throwable) {
+                    getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
+                } else {
+                    getSender().tell(reply, self());
+                }
+            } catch(AssertionError e) {
+                assertionError = e;
+            }
+        }
 
 
-    @Test
-    public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+        void verify() {
+            if(assertionError != null) {
+                throw assertionError;
+            }
 
 
-        ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+            if(builder.expCanCommitType != null) {
+                assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
+            }
 
 
-        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+            if(builder.expCommitType != null) {
+                assertEquals("CommitTransaction count", 1, commitCount.get());
+            }
 
 
-        proxy.canCommit().get(5, TimeUnit.SECONDS);
-        proxy.preCommit().get(5, TimeUnit.SECONDS);
-        proxy.commit().get(5, TimeUnit.SECONDS);
+            if(builder.expAbortType != null) {
+                assertEquals("AbortTransaction count", 1, abortCount.get());
+            }
+        }
 
 
+        static class Builder {
+            private Class<?> expCanCommitType;
+            private Class<?> expCommitType;
+            private Class<?> expAbortType;
+            private Object canCommitReply;
+            private Object commitReply;
+            private Object abortReply;
+            private final String transactionId;
+
+            Builder(String transactionId) {
+                this.transactionId = transactionId;
+            }
+
+            Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
+                this.expCanCommitType = expCanCommitType;
+                this.canCommitReply = canCommitReply;
+                return this;
+            }
+
+            Builder expectCanCommit(Object canCommitReply) {
+                return expectCanCommit(CanCommitTransaction.class, canCommitReply);
+            }
+
+            Builder expectCommit(Class<?> expCommitType, Object commitReply) {
+                this.expCommitType = expCommitType;
+                this.commitReply = commitReply;
+                return this;
+            }
+
+            Builder expectCommit(Object commitReply) {
+                return expectCommit(CommitTransaction.class, commitReply);
+            }
+
+            Builder expectAbort(Class<?> expAbortType, Object abortReply) {
+                this.expAbortType = expAbortType;
+                this.abortReply = abortReply;
+                return this;
+            }
+
+            Builder expectAbort(Object abortReply) {
+                return expectAbort(AbortTransaction.class, abortReply);
+            }
+
+            Props props() {
+                return Props.create(CohortActor.class, this);
+            }
+        }
     }
 }
     }
 }
index a6077e23e489d2017f48fd9c54b28a48c99dd03a..4499043b14223739a1aab6920d2f2d215d3771a6 100644 (file)
@@ -7,18 +7,30 @@
  */
 package org.opendaylight.controller.cluster.datastore.compat;
 
  */
 package org.opendaylight.controller.cluster.datastore.compat;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.TransactionType;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.TransactionType;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 /**
  * Shard unit tests for backwards compatibility with pre-Boron versions.
 
 /**
  * Shard unit tests for backwards compatibility with pre-Boron versions.
@@ -47,4 +59,71 @@ public class PreBoronShardTest extends AbstractShardTest {
                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
         }};
     }
                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
         }};
     }
+
+    @Test
+    public void testBatchedModificationsWithCommitOnReady() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testBatchedModificationsWithCommitOnReady");
+
+            waitUntilLeader(shard);
+
+            final String transactionID = "tx";
+
+            BatchedModifications batched = new BatchedModifications(transactionID,
+                    DataStoreVersions.LITHIUM_VERSION, "");
+            batched.addModification(new WriteModification(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
+            batched.setReady(true);
+            batched.setDoCommitOnReady(true);
+            batched.setTotalMessagesSent(1);
+
+            shard.tell(batched, getRef());
+            expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+        }};
+    }
+
+    @Test
+    public void testImmediateCommitWithForwardedReadyTransaction() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testImmediateCommitWithForwardedReadyTransaction");
+
+            waitUntilLeader(shard);
+
+            final String transactionID = "tx";
+
+            shard.tell(prepareForwardedReadyTransaction(mockShardDataTreeCohort(), transactionID,
+                    DataStoreVersions.LITHIUM_VERSION, true), getRef());
+
+            expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+        }};
+    }
+
+    @Test
+    public void testThreePhaseCommitOnForwardedReadyTransaction() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testThreePhaseCommitOnForwardedReadyTransaction");
+
+            waitUntilLeader(shard);
+
+            final String transactionID = "tx";
+
+            shard.tell(prepareForwardedReadyTransaction(mockShardDataTreeCohort(), transactionID,
+                    DataStoreVersions.LITHIUM_VERSION, false), getRef());
+            expectMsgClass(ReadyTransactionReply.class);
+
+            shard.tell(new CanCommitTransaction(transactionID, DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            shard.tell(new CommitTransaction(transactionID, DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef());
+            expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+        }};
+    }
 }
 }