Fix shard deadlock in 3 nodes 74/72874/11
authorTom Pantelis <tompantelis@gmail.com>
Tue, 12 Jun 2018 11:54:30 +0000 (07:54 -0400)
committerRobert Varga <nite@hq.sk>
Thu, 21 Jun 2018 08:47:27 +0000 (08:47 +0000)
JIRA: CONTROLLER-1836
Change-Id: I10a9cb43bcdb35f66abebb054f37c05e7fda54e7
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
44 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf

index 89dd59a..42c0541 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.SortedSet;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
@@ -208,7 +209,8 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
     abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod,
             Exception failure);
 
-    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod,
+            Optional<SortedSet<String>> participatingShardNames);
 
     @Override
     public String toString() {
index 19c0627..2cb4dee 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
@@ -87,4 +89,9 @@ final class ChainedCommitCohort extends ShardDataTreeCohort {
     public State getState() {
         return delegate.getState();
     }
+
+    @Override
+    Optional<SortedSet<String>> getParticipatingShardNames() {
+        return delegate.getParticipatingShardNames();
+    }
 }
index a66a496..3d5238e 100644 (file)
@@ -11,6 +11,8 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -109,10 +111,10 @@ final class CohortEntry {
         cohort.abort(callback);
     }
 
-    void ready(final CohortDecorator cohortDecorator) {
+    void ready(final Optional<SortedSet<String>> participatingShardNames, final CohortDecorator cohortDecorator) {
         Preconditions.checkState(cohort == null, "cohort was already set");
 
-        cohort = transaction.ready();
+        cohort = transaction.ready(participatingShardNames);
 
         if (cohortDecorator != null) {
             // Call the hook for unit tests.
@@ -120,6 +122,10 @@ final class CohortEntry {
         }
     }
 
+    Optional<SortedSet<String>> getParticipatingShardNames() {
+        return cohort != null ? cohort.getParticipatingShardNames() : Optional.empty();
+    }
+
     boolean isDoImmediateCommit() {
         return doImmediateCommit;
     }
index 3a24260..0b4115a 100644 (file)
@@ -18,5 +18,6 @@ public interface DataStoreVersions {
     short HELIUM_2_VERSION = 2;
     short LITHIUM_VERSION = 3;
     short BORON_VERSION = 5;
-    short CURRENT_VERSION = BORON_VERSION;
+    short FLUORINE_VERSION = 9;
+    short CURRENT_VERSION = FLUORINE_VERSION;
 }
index 5af7c79..d5fcc5e 100644 (file)
@@ -513,7 +513,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         if (optFailure.isPresent()) {
             state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
         } else {
-            state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification));
+            state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification,
+                    java.util.Optional.empty()));
         }
 
         if (request.isCoordinated()) {
@@ -611,7 +612,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
 
         applyModifications(modifications);
-        state = new Ready(checkOpen().ready());
+        state = new Ready(checkOpen().ready(java.util.Optional.empty()));
         LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
     }
 
index d7c3012..73d2c9f 100644 (file)
@@ -14,6 +14,8 @@ import com.google.common.collect.TreeRangeSet;
 import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -75,7 +77,8 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
-        return chain.createReadyCohort(id, mod);
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod,
+            final Optional<SortedSet<String>> participatingShardNames) {
+        return chain.createReadyCohort(id, mod, participatingShardNames);
     }
 }
index 23207f0..8d00681 100644 (file)
@@ -12,6 +12,8 @@ import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
@@ -59,18 +61,19 @@ class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
         this.modification = null;
     }
 
-    private Future<Object> initiateCommit(final boolean immediate) {
+    private Future<Object> initiateCommit(final boolean immediate,
+            final Optional<SortedSet<String>> participatingShardNames) {
         if (operationError != null) {
             return Futures.failed(operationError);
         }
 
         final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier(),
-                modification, immediate);
+                modification, immediate, participatingShardNames);
         return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
     }
 
-    Future<ActorSelection> initiateCoordinatedCommit() {
-        final Future<Object> messageFuture = initiateCommit(false);
+    Future<ActorSelection> initiateCoordinatedCommit(Optional<SortedSet<String>> participatingShardNames) {
+        final Future<Object> messageFuture = initiateCommit(false, participatingShardNames);
         final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
                 transaction.getIdentifier());
         ret.onComplete(new OnComplete<ActorSelection>() {
@@ -90,7 +93,7 @@ class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
     }
 
     Future<Object> initiateDirectCommit() {
-        final Future<Object> messageFuture = initiateCommit(true);
+        final Future<Object> messageFuture = initiateCommit(true, Optional.empty());
         messageFuture.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(final Throwable failure, final Object message) throws Throwable {
index 9cf5312..f4ade4f 100644 (file)
@@ -13,6 +13,8 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
@@ -78,9 +80,10 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+            final Optional<SortedSet<String>> participatingShardNames) {
         final LocalThreePhaseCommitCohort cohort = ready();
-        return cohort.initiateCoordinatedCommit();
+        return cohort.initiateCoordinatedCommit(participatingShardNames);
     }
 
     @Override
index d14a936..03ed5ad 100644 (file)
@@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
@@ -41,7 +43,8 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+            final Optional<SortedSet<String>> participatingShardNamess) {
         LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
index fe5dba8..f28d0d0 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
@@ -18,8 +20,8 @@ public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTr
         super(parent, id, modification);
     }
 
-    ShardDataTreeCohort ready() {
+    ShardDataTreeCohort ready(Optional<SortedSet<String>> participatingShardNames) {
         Preconditions.checkState(close(), "Transaction is already closed");
-        return getParent().finishTransaction(this);
+        return getParent().finishTransaction(this, participatingShardNames);
     }
 }
index 27969b3..ce4bda7 100644 (file)
@@ -13,6 +13,8 @@ import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -80,11 +82,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
         // Send the remaining batched modifications, if any, with the ready flag set.
         bumpPermits(havePermit);
-        return sendBatchedModifications(true, true);
+        return sendBatchedModifications(true, true, Optional.empty());
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+            final Optional<SortedSet<String>> participatingShardNames) {
         logModificationCount();
 
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
@@ -92,7 +95,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send the remaining batched modifications, if any, with the ready flag set.
 
         bumpPermits(havePermit);
-        Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
 
         return transformReadyReply(lastModificationsFuture);
     }
@@ -133,10 +136,11 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     protected Future<Object> sendBatchedModifications() {
-        return sendBatchedModifications(false, false);
+        return sendBatchedModifications(false, false, Optional.empty());
     }
 
-    protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) {
+    protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
+            final Optional<SortedSet<String>> participatingShardNames) {
         Future<Object> sent = null;
         if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
             if (batchedModifications == null) {
@@ -146,7 +150,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                     batchedModifications.getModifications().size(), ready);
 
-            batchedModifications.setReady(ready);
             batchedModifications.setDoCommitOnReady(doCommitOnReady);
             batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
 
@@ -155,6 +158,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             batchPermits = 0;
 
             if (ready) {
+                batchedModifications.setReady(participatingShardNames);
+                batchedModifications.setDoCommitOnReady(doCommitOnReady);
                 batchedModifications = null;
             } else {
                 batchedModifications = newBatchedModifications();
index 47a25d7..c3cd8ef 100644 (file)
@@ -161,6 +161,8 @@ public class Shard extends RaftActor {
     /// The name of this shard
     private final String name;
 
+    private final String shardName;
+
     private final ShardStats shardMBean;
 
     private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
@@ -201,6 +203,7 @@ public class Shard extends RaftActor {
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
         this.name = builder.getId().toString();
+        this.shardName = builder.getId().getShardName();
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
         this.frontendMetadata = new FrontendMetadata(name);
@@ -586,6 +589,10 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    String getShardName() {
+        return shardName;
+    }
+
     @Override
     protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
             final short leaderPayloadVersion) {
@@ -754,7 +761,8 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
 
                 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
-                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
+                        forwardedReady.getParticipatingShardNames());
                 readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(readyLocal, getContext());
             }
@@ -923,6 +931,8 @@ public class Shard extends RaftActor {
                             messagesToForward.size(), leader);
 
                     for (Object message : messagesToForward) {
+                        LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
                         leader.tell(message, self());
                     }
                 }
index 080d3ee..0dd50d4 100644 (file)
@@ -103,7 +103,7 @@ final class ShardCommitCoordinator {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionId(), ready.getTxnClientVersion());
 
-        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
 
@@ -162,7 +162,7 @@ final class ShardCommitCoordinator {
             }
 
             cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
-            cohortEntry.ready(cohortDecorator);
+            cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
 
             if (batched.isDoCommitOnReady()) {
                 cohortEntry.setReplySender(sender);
@@ -187,7 +187,8 @@ final class ShardCommitCoordinator {
      */
     void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
         final TransactionIdentifier txId = message.getTransactionId();
-        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
+        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+                message.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
@@ -227,7 +228,9 @@ final class ShardCommitCoordinator {
 
         BatchedModifications last = newModifications.getLast();
         last.setDoCommitOnReady(from.isDoCommitOnReady());
-        last.setReady(from.isReady());
+        if (from.isReady()) {
+            last.setReady(from.getParticipatingShardNames());
+        }
         last.setTotalMessagesSent(newModifications.size());
         return newModifications;
     }
@@ -454,7 +457,7 @@ final class ShardCommitCoordinator {
             if (last != null) {
                 final boolean immediate = cohortEntry.isDoImmediateCommit();
                 last.setDoCommitOnReady(immediate);
-                last.setReady(true);
+                last.setReady(cohortEntry.getParticipatingShardNames());
                 last.setTotalMessagesSent(newMessages.size());
 
                 messages.addAll(newMessages);
index 6d9f78a..5f1e67e 100644 (file)
@@ -27,11 +27,14 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -100,6 +103,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             this.cohort = Preconditions.checkNotNull(cohort);
             lastAccess = now;
         }
+
+        @Override
+        public String toString() {
+            return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+        }
     }
 
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
@@ -115,7 +123,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
-    private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
+    private final Deque<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
 
@@ -643,11 +651,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
-        return createReadyCohort(transaction.getIdentifier(), snapshot);
+        return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
     }
 
     void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
@@ -786,13 +795,108 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
         if (!cohort.equals(head.cohort)) {
-            LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
-            return;
+            // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this
+            // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx
+            // has other participating shards, it could deadlock with other tx's accessing the same shards
+            // depending on the order the tx's are readied on each shard
+            // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating
+            // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx
+            // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock
+            // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since
+            // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of
+            // the queue as a result, then proceed with canCommit.
+
+            Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
+            if (precedingShardNames.isEmpty()) {
+                LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+                return;
+            }
+
+            LOG.debug("{}: Evaluating tx {} for canCommit -  preceding participating shard names {}",
+                    logContext, cohort.getIdentifier(), precedingShardNames);
+            final Iterator<CommitEntry> iter = pendingTransactions.iterator();
+            int index = -1;
+            int moveToIndex = -1;
+            while (iter.hasNext()) {
+                final CommitEntry entry = iter.next();
+                ++index;
+
+                if (cohort.equals(entry.cohort)) {
+                    if (moveToIndex < 0) {
+                        LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
+                                logContext, cohort.getIdentifier());
+                        return;
+                    }
+
+                    LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
+                            logContext, cohort.getIdentifier(), moveToIndex);
+                    iter.remove();
+                    insertEntry(pendingTransactions, entry, moveToIndex);
+
+                    if (!cohort.equals(pendingTransactions.peek().cohort)) {
+                        LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
+                                logContext, cohort.getIdentifier());
+                        return;
+                    }
+
+                    LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
+                            logContext, cohort.getIdentifier());
+                    break;
+                }
+
+                if (entry.cohort.getState() != State.READY) {
+                    LOG.debug("{}: Skipping pending transaction {} in state {}",
+                            logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+                    continue;
+                }
+
+                final Collection<String> pendingPrecedingShardNames = extractPrecedingShardNames(
+                        entry.cohort.getParticipatingShardNames());
+
+                if (precedingShardNames.equals(pendingPrecedingShardNames)) {
+                    if (moveToIndex < 0) {
+                        LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
+                                logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+                        moveToIndex = index;
+                    } else {
+                        LOG.debug(
+                            "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
+                            logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+                    }
+                } else {
+                    LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
+                        logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+                }
+            }
         }
 
         processNextPendingTransaction();
     }
 
+    private void insertEntry(Deque<CommitEntry> queue, CommitEntry entry, int atIndex) {
+        if (atIndex == 0) {
+            queue.addFirst(entry);
+            return;
+        }
+
+        LOG.trace("Inserting into Deque at index {}", atIndex);
+
+        Deque<CommitEntry> tempStack = new ArrayDeque<>(atIndex);
+        for (int i = 0; i < atIndex; i++) {
+            tempStack.push(queue.poll());
+        }
+
+        queue.addFirst(entry);
+
+        tempStack.forEach(queue::addFirst);
+    }
+
+    private Collection<String> extractPrecedingShardNames(
+            java.util.Optional<SortedSet<String>> participatingShardNames) {
+        return participatingShardNames.map((Function<SortedSet<String>, Collection<String>>)
+            set -> set.headSet(shard.getShardName())).orElse(Collections.<String>emptyList());
+    }
+
     private void failPreCommit(final Throwable cause) {
         shard.getShardMBean().incrementFailedTransactionsCount();
         pendingTransactions.poll().cohort.failedPreCommit(cause);
@@ -950,22 +1054,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
-                        COMMIT_STEP_TIMEOUT));
+                        COMMIT_STEP_TIMEOUT), participatingShardNames);
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
     }
 
     // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
     // the newReadWriteTransaction()
-    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         if (txId.getHistoryId().getHistoryId() == 0) {
-            return createReadyCohort(txId, mod);
+            return createReadyCohort(txId, mod, participatingShardNames);
         }
 
-        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
index d989cbf..581768c 100644 (file)
@@ -12,6 +12,8 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -43,6 +45,8 @@ public abstract class ShardDataTreeCohort implements Identifiable<TransactionIde
 
     abstract DataTreeModification getDataTreeModification();
 
+    abstract Optional<SortedSet<String>> getParticipatingShardNames();
+
     // FIXME: Should return rebased DataTreeCandidateTip
     @VisibleForTesting
     public abstract void canCommit(FutureCallback<Void> callback);
index f2e9af3..a774d64 100644 (file)
@@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import java.util.Optional;
+import java.util.SortedSet;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -88,13 +90,14 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+            final Optional<SortedSet<String>> participatingShardNames) {
         Preconditions.checkState(openTransaction != null,
                 "Attempted to finish transaction %s while none is outstanding", transaction);
 
         // dataTree is finalizing ready the transaction, we just record it for the next
         // transaction in chain
-        final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction);
+        final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction, participatingShardNames);
         openTransaction = null;
         previousTx = transaction;
         LOG.debug("Committing transaction {}", transaction);
@@ -125,7 +128,8 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
-        return dataTree.createReadyCohort(txId, mod);
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final Optional<SortedSet<String>> participatingShardNames) {
+        return dataTree.createReadyCohort(txId, mod, participatingShardNames);
     }
 }
index 23fa0c2..0db6f08 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
@@ -16,9 +18,11 @@ abstract class ShardDataTreeTransactionParent {
 
     abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
 
-    abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+    abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction,
+            Optional<SortedSet<String>> participatingShardNames);
 
-    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod);
+    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod,
+            Optional<SortedSet<String>> participatingShardNames);
 
     abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier txId, DataTreeModification mod,
             Exception failure);
index dc77b29..b3f4b0b 100644 (file)
@@ -77,7 +77,7 @@ public class ShardWriteTransaction extends ShardTransaction {
                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
                 }
 
-                readyTransaction(false, batched.isDoCommitOnReady(), batched.getVersion());
+                readyTransaction(batched);
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
@@ -109,13 +109,13 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit, short clientTxVersion) {
+    private void readyTransaction(BatchedModifications batched) {
         TransactionIdentifier transactionID = getTransactionId();
 
         LOG.debug("readyTransaction : {}", transactionID);
 
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, clientTxVersion,
-                transaction, doImmediateCommit), getContext());
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, batched.getVersion(),
+                transaction, batched.isDoCommitOnReady(), batched.getParticipatingShardNames()), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
index 3104cb4..b5b49c2 100644 (file)
@@ -7,13 +7,17 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import java.util.Optional;
+import java.util.SortedSet;
 import java.util.concurrent.CompletionStage;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
@@ -28,6 +32,8 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     private final ShardDataTree dataTree;
     private final TransactionIdentifier transactionId;
     private final CompositeDataTreeCohort userCohorts;
+    @Nullable
+    private final SortedSet<String> participatingShardNames;
 
     private State state = State.READY;
     private DataTreeCandidateTip candidate;
@@ -35,20 +41,23 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     private Exception nextFailure;
 
     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
-            final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
-        this.dataTree = Preconditions.checkNotNull(dataTree);
-        this.transaction = Preconditions.checkNotNull(transaction);
-        this.transactionId = Preconditions.checkNotNull(transactionId);
-        this.userCohorts = Preconditions.checkNotNull(userCohorts);
+            final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts,
+            final Optional<SortedSet<String>> participatingShardNames) {
+        this.dataTree = requireNonNull(dataTree);
+        this.transaction = requireNonNull(transaction);
+        this.transactionId = requireNonNull(transactionId);
+        this.userCohorts = requireNonNull(userCohorts);
+        this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
     }
 
     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
         final TransactionIdentifier transactionId, final Exception nextFailure) {
-        this.dataTree = Preconditions.checkNotNull(dataTree);
-        this.transaction = Preconditions.checkNotNull(transaction);
-        this.transactionId = Preconditions.checkNotNull(transactionId);
+        this.dataTree = requireNonNull(dataTree);
+        this.transaction = requireNonNull(transaction);
+        this.transactionId = requireNonNull(transactionId);
         this.userCohorts = null;
-        this.nextFailure = Preconditions.checkNotNull(nextFailure);
+        this.participatingShardNames = null;
+        this.nextFailure = requireNonNull(nextFailure);
     }
 
     @Override
@@ -66,8 +75,14 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         return transaction;
     }
 
+    @Override
+    Optional<SortedSet<String>> getParticipatingShardNames() {
+        return Optional.ofNullable(participatingShardNames);
+    }
+
     private void checkState(final State expected) {
-        Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
+        Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s",
+                state, expected, getIdentifier());
     }
 
     @Override
@@ -77,7 +92,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         }
 
         checkState(State.READY);
-        this.callback = Preconditions.checkNotNull(newCallback);
+        this.callback = requireNonNull(newCallback);
         state = State.CAN_COMMIT_PENDING;
 
         if (nextFailure == null) {
@@ -90,7 +105,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     @Override
     public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
         checkState(State.CAN_COMMIT_COMPLETE);
-        this.callback = Preconditions.checkNotNull(newCallback);
+        this.callback = requireNonNull(newCallback);
         state = State.PRE_COMMIT_PENDING;
 
         if (nextFailure == null) {
@@ -128,7 +143,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     @Override
     public void commit(final FutureCallback<UnsignedLong> newCallback) {
         checkState(State.PRE_COMMIT_COMPLETE);
-        this.callback = Preconditions.checkNotNull(newCallback);
+        this.callback = requireNonNull(newCallback);
         state = State.COMMIT_PENDING;
 
         if (nextFailure == null) {
@@ -257,7 +272,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
 
     void reportFailure(final Exception cause) {
         if (nextFailure == null) {
-            this.nextFailure = Preconditions.checkNotNull(cause);
+            this.nextFailure = requireNonNull(cause);
         } else {
             LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause);
         }
index 7fd53c7..3d4e373 100644 (file)
@@ -14,6 +14,8 @@ import com.google.common.collect.TreeRangeSet;
 import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -79,7 +81,8 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
-        return tree.createReadyCohort(id, mod);
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod,
+            final Optional<SortedSet<String>> participatingShardNames) {
+        return tree.createReadyCohort(id, mod, participatingShardNames);
     }
 }
index fdcd306..90dcec2 100644 (file)
@@ -216,7 +216,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         for (CohortInfo cohort : cohorts) {
             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
 
-            LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
+            LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
 
             futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
                     actorContext.getTransactionCommitOperationTimeout()));
index 543834c..d9a53ab 100644 (file)
@@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import scala.concurrent.Future;
@@ -20,7 +22,7 @@ import scala.concurrent.Future;
 interface TransactionContext {
     void closeTransaction();
 
-    Future<ActorSelection> readyTransaction(Boolean havePermit);
+    Future<ActorSelection> readyTransaction(Boolean havePermit, Optional<SortedSet<String>> participatingShardNames);
 
     void executeModification(AbstractModification modification, Boolean havePermit);
 
index d11ee3e..38f55f3 100644 (file)
@@ -15,6 +15,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -194,18 +196,18 @@ class TransactionContextWrapper {
         }
     }
 
-    Future<ActorSelection> readyTransaction() {
+    Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
         // avoid the creation of a promise and a TransactionOperation
         final TransactionContext localContext = transactionContext;
         if (localContext != null) {
-            return localContext.readyTransaction(null);
+            return localContext.readyTransaction(null, participatingShardNames);
         }
 
         final Promise<ActorSelection> promise = Futures.promise();
         enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
-                promise.completeWith(newTransactionContext.readyTransaction(havePermit));
+                promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
             }
         });
 
index 5a1cb67..b04dd29 100644 (file)
@@ -25,7 +25,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -61,28 +63,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
 
-    // Global lock used for transactions spanning multiple shards - synchronizes sending of the ready messages
-    // for atomicity to avoid potential deadlock with concurrent transactions spanning the same shards as outlined
-    // in the following scenario:
-    //
-    //  - Tx1 sends ready message to shard A
-    //  - Tx2 sends ready message to shard A
-    //  - Tx2 sends ready message to shard B
-    //  - Tx1 sends ready message to shard B
-    //
-    // This scenario results in deadlock: after Tx1 canCommits to shard A, it can't proceed with shard B until Tx2
-    // completes as Tx2 was readied first on shard B. However Tx2 cannot make progress because it's waiting to canCommit
-    // on shard A which is blocked by Tx1.
-    //
-    // The global lock avoids this as it forces the ready messages to be sent in a predictable order:
-    //
-    //  - Tx1 sends ready message to shard A
-    //  - Tx1 sends ready message to shard B
-    //  - Tx2 sends ready message to shard A
-    //  - Tx2 sends ready message to shard B
-    //
-    private static final Object GLOBAL_TX_READY_LOCK = new Object();
-
     private final Map<String, TransactionContextWrapper> txContextWrappers = new TreeMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
     private final TransactionType type;
@@ -251,7 +231,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                 ret = createSingleCommitCohort(e.getKey(), e.getValue());
                 break;
             default:
-                ret = createMultiCommitCohort(txContextWrappers.entrySet());
+                ret = createMultiCommitCohort();
         }
 
         txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
@@ -299,24 +279,23 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return transactionContext.directCommit(havePermit);
     }
 
-    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
-            final Set<Entry<String, TransactionContextWrapper>> txContextWrapperEntries) {
-
-        final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrapperEntries.size());
+    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
 
-        synchronized (GLOBAL_TX_READY_LOCK) {
-            for (Entry<String, TransactionContextWrapper> e : txContextWrapperEntries) {
-                LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
+        final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrappers.size());
+        final java.util.Optional<SortedSet<String>> shardNames =
+                java.util.Optional.of(new TreeSet<>(txContextWrappers.keySet()));
+        for (Entry<String, TransactionContextWrapper> e : txContextWrappers.entrySet()) {
+            LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
-                final TransactionContextWrapper wrapper = e.getValue();
+            final TransactionContextWrapper wrapper = e.getValue();
 
-                // The remote tx version is obtained the via TransactionContext which may not be available yet so
-                // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
-                // TransactionContext is available.
-                Supplier<Short> txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion();
+            // The remote tx version is obtained the via TransactionContext which may not be available yet so
+            // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
+            // TransactionContext is available.
+            Supplier<Short> txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion();
 
-                cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier));
-            }
+            cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames),
+                    txVersionSupplier));
         }
 
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, getIdentifier());
index 6058e7e..4e344cd 100644 (file)
@@ -237,7 +237,9 @@ class EntityOwnershipShardCommitCoordinator {
         BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(),
                 toPrune.getVersion());
         prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady());
-        prunedModifications.setReady(toPrune.isReady());
+        if (toPrune.isReady()) {
+            prunedModifications.setReady(toPrune.getParticipatingShardNames());
+        }
         prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent());
         for (Modification mod: toPrune.getModifications()) {
             if (canForwardModificationToNewLeader(mod)) {
@@ -275,7 +277,7 @@ class EntityOwnershipShardCommitCoordinator {
         BatchedModifications modifications = new BatchedModifications(
             new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
         modifications.setDoCommitOnReady(true);
-        modifications.setReady(true);
+        modifications.setReady();
         modifications.setTotalMessagesSent(1);
         return modifications;
     }
index 3773bee..b38cd87 100644 (file)
@@ -7,11 +7,18 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.Optional;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 
 /**
@@ -26,21 +33,33 @@ public class BatchedModifications extends MutableCompositeModification {
     private boolean doCommitOnReady;
     private int totalMessagesSent;
     private TransactionIdentifier transactionId;
+    @Nullable
+    private SortedSet<String> participatingShardNames;
 
     public BatchedModifications() {
     }
 
     public BatchedModifications(TransactionIdentifier transactionId, short version) {
         super(version);
-        this.transactionId = Preconditions.checkNotNull(transactionId, "transactionID can't be null");
+        this.transactionId = requireNonNull(transactionId, "transactionID can't be null");
     }
 
     public boolean isReady() {
         return ready;
     }
 
-    public void setReady(boolean ready) {
-        this.ready = ready;
+    public void setReady(Optional<SortedSet<String>> possibleParticipatingShardNames) {
+        this.ready = true;
+        this.participatingShardNames = requireNonNull(possibleParticipatingShardNames).orElse(null);
+        Preconditions.checkArgument(this.participatingShardNames == null || this.participatingShardNames.size() > 1);
+    }
+
+    public void setReady() {
+        setReady(Optional.empty());
+    }
+
+    public Optional<SortedSet<String>> getParticipatingShardNames() {
+        return Optional.ofNullable(participatingShardNames);
     }
 
     public boolean isDoCommitOnReady() {
@@ -63,7 +82,6 @@ public class BatchedModifications extends MutableCompositeModification {
         return transactionId;
     }
 
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
@@ -71,6 +89,18 @@ public class BatchedModifications extends MutableCompositeModification {
         ready = in.readBoolean();
         totalMessagesSent = in.readInt();
         doCommitOnReady = in.readBoolean();
+
+        if (getVersion() >= DataStoreVersions.FLUORINE_VERSION) {
+            final int count = in.readInt();
+            if (count != 0) {
+                SortedSet<String> shardNames = new TreeSet<>();
+                for (int i = 0; i < count; i++) {
+                    shardNames.add((String) in.readObject());
+                }
+
+                participatingShardNames = shardNames;
+            }
+        }
     }
 
     @Override
@@ -80,12 +110,24 @@ public class BatchedModifications extends MutableCompositeModification {
         out.writeBoolean(ready);
         out.writeInt(totalMessagesSent);
         out.writeBoolean(doCommitOnReady);
+
+        if (getVersion() >= DataStoreVersions.FLUORINE_VERSION) {
+            if (participatingShardNames != null) {
+                out.writeInt(participatingShardNames.size());
+                for (String shardName: participatingShardNames) {
+                    out.writeObject(shardName);
+                }
+            } else {
+                out.writeInt(0);
+            }
+        }
     }
 
     @Override
     public String toString() {
         return "BatchedModifications [transactionId=" + transactionId
-                + ", ready=" + ready
+                + ", ready=" + isReady()
+                + ", participatingShardNames=" + participatingShardNames
                 + ", totalMessagesSent=" + totalMessagesSent
                 + ", modifications size=" + getModifications().size() + "]";
     }
index 9cd5e66..529b7e2 100644 (file)
@@ -7,7 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.SortedSet;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.ReadWriteShardDataTreeTransaction;
 
@@ -21,13 +25,17 @@ public class ForwardedReadyTransaction {
     private final ReadWriteShardDataTreeTransaction transaction;
     private final boolean doImmediateCommit;
     private final short txnClientVersion;
+    @Nullable
+    private final SortedSet<String> participatingShardNames;
 
     public ForwardedReadyTransaction(TransactionIdentifier transactionId, short txnClientVersion,
-            ReadWriteShardDataTreeTransaction transaction, boolean doImmediateCommit) {
-        this.transactionId = Preconditions.checkNotNull(transactionId);
-        this.transaction = Preconditions.checkNotNull(transaction);
+            ReadWriteShardDataTreeTransaction transaction, boolean doImmediateCommit,
+            Optional<SortedSet<String>> participatingShardNames) {
+        this.transactionId = requireNonNull(transactionId);
+        this.transaction = requireNonNull(transaction);
         this.txnClientVersion = txnClientVersion;
         this.doImmediateCommit = doImmediateCommit;
+        this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
     }
 
     public TransactionIdentifier getTransactionId() {
@@ -46,9 +54,14 @@ public class ForwardedReadyTransaction {
         return doImmediateCommit;
     }
 
+    public Optional<SortedSet<String>> getParticipatingShardNames() {
+        return Optional.ofNullable(participatingShardNames);
+    }
+
     @Override
     public String toString() {
-        return "ForwardedReadyTransaction [transactionId=" + transactionId + ", doImmediateCommit=" + doImmediateCommit
+        return "ForwardedReadyTransaction [transactionId=" + transactionId + ", transaction=" + transaction
+                + ", doImmediateCommit=" + doImmediateCommit + ", participatingShardNames=" + participatingShardNames
                 + ", txnClientVersion=" + txnClientVersion + "]";
     }
 }
index 2664fc1..bff6ea8 100644 (file)
@@ -7,7 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.SortedSet;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@@ -23,15 +27,18 @@ public final class ReadyLocalTransaction {
     private final DataTreeModification modification;
     private final TransactionIdentifier transactionId;
     private final boolean doCommitOnReady;
+    @Nullable
+    private final SortedSet<String> participatingShardNames;
 
     // The version of the remote system used only when needing to convert to BatchedModifications.
     private short remoteVersion = DataStoreVersions.CURRENT_VERSION;
 
     public ReadyLocalTransaction(final TransactionIdentifier transactionId, final DataTreeModification modification,
-            final boolean doCommitOnReady) {
-        this.transactionId = Preconditions.checkNotNull(transactionId);
-        this.modification = Preconditions.checkNotNull(modification);
+            final boolean doCommitOnReady, Optional<SortedSet<String>> participatingShardNames) {
+        this.transactionId = requireNonNull(transactionId);
+        this.modification = requireNonNull(modification);
         this.doCommitOnReady = doCommitOnReady;
+        this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
     }
 
     public TransactionIdentifier getTransactionId() {
@@ -53,4 +60,8 @@ public final class ReadyLocalTransaction {
     public void setRemoteVersion(short remoteVersion) {
         this.remoteVersion = remoteVersion;
     }
+
+    public Optional<SortedSet<String>> getParticipatingShardNames() {
+        return Optional.ofNullable(participatingShardNames);
+    }
 }
index 60a85b9..53731fa 100644 (file)
@@ -48,7 +48,7 @@ public final class ReadyLocalTransactionSerializer extends JSerializer {
                 readyLocal.getRemoteVersion());
         batched.setDoCommitOnReady(readyLocal.isDoCommitOnReady());
         batched.setTotalMessagesSent(1);
-        batched.setReady(true);
+        batched.setReady(readyLocal.getParticipatingShardNames());
 
         readyLocal.getModification().applyToCursor(new BatchedCursor(batched));
 
index f2e0e0a..b289b6b 100644 (file)
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -265,7 +266,7 @@ public abstract class AbstractShardTest extends AbstractActorTest {
                                                              final boolean doCommitOnReady) {
         final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION);
         batchedModifications.addModification(modification);
-        batchedModifications.setReady(true);
+        batchedModifications.setReady();
         batchedModifications.setDoCommitOnReady(doCommitOnReady);
         batchedModifications.setTotalMessagesSent(1);
         return batchedModifications;
@@ -284,7 +285,7 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore()
                 .newReadWriteTransaction(transactionID);
         rwTx.getSnapshot().write(path, data);
-        return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady);
+        return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady, Optional.empty());
     }
 
     public static NormalizedNode<?,?> readStore(final TestActorRef<? extends Shard> shard,
@@ -330,7 +331,7 @@ public abstract class AbstractShardTest extends AbstractActorTest {
             final NormalizedNode<?,?> node) throws DataValidationFailedException {
         final BatchedModifications batched = new BatchedModifications(nextTransactionId(), CURRENT_VERSION);
         batched.addModification(new MergeModification(id, node));
-        batched.setReady(true);
+        batched.setReady();
         batched.setDoCommitOnReady(true);
         batched.setTotalMessagesSent(1);
 
@@ -366,12 +367,24 @@ public abstract class AbstractShardTest extends AbstractActorTest {
             final boolean doCommitOnReady, final int messagesSent) {
         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
         batched.addModification(new WriteModification(path, data));
-        batched.setReady(ready);
+        if (ready) {
+            batched.setReady();
+        }
         batched.setDoCommitOnReady(doCommitOnReady);
         batched.setTotalMessagesSent(messagesSent);
         return batched;
     }
 
+    static BatchedModifications newReadyBatchedModifications(final TransactionIdentifier transactionID,
+            final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+            final SortedSet<String> participatingShardNames) {
+        final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
+        batched.addModification(new WriteModification(path, data));
+        batched.setReady(Optional.of(participatingShardNames));
+        batched.setTotalMessagesSent(1);
+        return batched;
+    }
+
     @SuppressWarnings("unchecked")
     static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
@@ -519,5 +532,10 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         public State getState() {
             return delegate.getState();
         }
+
+        @Override
+        Optional<SortedSet<String>> getParticipatingShardNames() {
+            return delegate.getParticipatingShardNames();
+        }
     }
 }
index 0d35a17..a998fbf 100644 (file)
@@ -98,10 +98,10 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest {
 
     private final Configuration configuration = new MockConfiguration() {
         Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
-                "junk", new ShardStrategy() {
+                TestModel.JUNK_QNAME.getLocalName(), new ShardStrategy() {
                     @Override
                     public String findShard(final YangInstanceIdentifier path) {
-                        return "junk";
+                        return TestModel.JUNK_QNAME.getLocalName();
                     }
 
                     @Override
@@ -109,10 +109,10 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest {
                         return YangInstanceIdentifier.EMPTY;
                     }
                 }).put(
-                "cars", new ShardStrategy() {
+                CarsModel.BASE_QNAME.getLocalName(), new ShardStrategy() {
                     @Override
                     public String findShard(final YangInstanceIdentifier path) {
-                        return "cars";
+                        return CarsModel.BASE_QNAME.getLocalName();
                     }
 
                     @Override
@@ -129,9 +129,9 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest {
         @Override
         public String getModuleNameFromNameSpace(final String nameSpace) {
             if (TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
-                return "junk";
+                return TestModel.JUNK_QNAME.getLocalName();
             } else if (CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
-                return "cars";
+                return CarsModel.BASE_QNAME.getLocalName();
             }
             return null;
         }
index aaab6b1..4f0f929 100644 (file)
@@ -653,7 +653,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
         modification.ready();
 
-        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true);
+        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true,
+                java.util.Optional.empty());
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -672,7 +673,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
         modification.ready();
 
-        readyLocal = new ReadyLocalTransaction(tx2 , modification, false);
+        readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty());
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
@@ -721,7 +722,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true);
+                        Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
+                java.util.Optional.empty());
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -741,7 +743,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         forwardedReady = new ForwardedReadyTransaction(tx2,
                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false);
+                        Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
+                java.util.Optional.empty());
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
index c3c6206..847b24b 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -17,6 +18,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import akka.actor.ActorRef;
+import java.util.Optional;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
@@ -59,7 +61,7 @@ public class FrontendReadWriteTransactionTest {
         shardTransaction = new ReadWriteShardDataTreeTransaction(mockParent, TX_ID, mockModification);
         openTx = FrontendReadWriteTransaction.createOpen(mockHistory, shardTransaction);
 
-        when(mockParent.finishTransaction(same(shardTransaction))).thenReturn(mockCohort);
+        when(mockParent.finishTransaction(same(shardTransaction), eq(Optional.empty()))).thenReturn(mockCohort);
     }
 
     private TransactionSuccess<?> handleRequest(final TransactionRequest<?> request) throws RequestException {
@@ -87,7 +89,7 @@ public class FrontendReadWriteTransactionTest {
         final TransactionRequest<?> readyReq = b.build();
 
         assertNotNull(handleRequest(readyReq));
-        verify(mockParent).finishTransaction(same(shardTransaction));
+        verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
 
         assertNotNull(handleRequest(readyReq));
         verifyNoMoreInteractions(mockParent);
@@ -101,7 +103,7 @@ public class FrontendReadWriteTransactionTest {
         final TransactionRequest<?> readyReq = b.build();
 
         assertNull(handleRequest(readyReq));
-        verify(mockParent).finishTransaction(same(shardTransaction));
+        verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
 
         assertNull(handleRequest(readyReq));
         verifyNoMoreInteractions(mockParent);
@@ -115,7 +117,7 @@ public class FrontendReadWriteTransactionTest {
         final TransactionRequest<?> readyReq = b.build();
 
         assertNull(handleRequest(readyReq));
-        verify(mockParent).finishTransaction(same(shardTransaction));
+        verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
 
         assertNull(handleRequest(readyReq));
         verifyNoMoreInteractions(mockParent);
@@ -129,7 +131,7 @@ public class FrontendReadWriteTransactionTest {
         final TransactionRequest<?> readyReq = b.build();
 
         assertNotNull(handleRequest(readyReq));
-        verify(mockParent).finishTransaction(same(shardTransaction));
+        verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
 
         handleRequest(new ReadTransactionRequest(TX_ID, 0, mock(ActorRef.class), YangInstanceIdentifier.EMPTY, true));
     }
@@ -142,7 +144,7 @@ public class FrontendReadWriteTransactionTest {
         final TransactionRequest<?> readyReq = b.build();
 
         assertNotNull(handleRequest(readyReq));
-        verify(mockParent).finishTransaction(same(shardTransaction));
+        verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
 
         b.setSequence(1);
         b.addModification(mock(TransactionModification.class));
index 0df870c..1fb2f8e 100644 (file)
@@ -112,10 +112,11 @@ public class LocalTransactionContextTest {
     @Test
     public void testReady() {
         final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
-        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(
+                java.util.Optional.empty());
         doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
 
-        Future<ActorSelection> future = localTransactionContext.readyTransaction(null);
+        Future<ActorSelection> future = localTransactionContext.readyTransaction(null, java.util.Optional.empty());
         assertTrue(future.isCompleted());
 
         verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
@@ -171,9 +172,10 @@ public class LocalTransactionContextTest {
 
     private void doReadyWithExpectedError(final RuntimeException expError) {
         LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
-        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(
+                java.util.Optional.empty());
         doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
 
-        localTransactionContext.readyTransaction(null);
+        localTransactionContext.readyTransaction(null, java.util.Optional.empty());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java
new file mode 100644 (file)
index 0000000..5239ff7
--- /dev/null
@@ -0,0 +1,563 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertNotNull;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.ID_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerMapNode;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
+
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.collect.ImmutableSortedSet;
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for various 3PC coordination scenarios.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardCommitCoordinationTest extends AbstractShardTest {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class);
+
+    /**
+     * Test 2 tx's accessing the same shards.
+     * <pre>
+     *   tx1 -> shard A, shard B
+     *   tx2 -> shard A, shard B
+     * </pre>
+     * The tx's are readied such the pendingTransactions queue are as follows:
+     * <pre>
+     *   Queue for shard A -> tx1, tx2
+     *   Queue for shard B -> tx2, tx1
+     * </pre>
+     * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B
+     * even though it isn't at the head of the queues.
+     */
+    @Test
+    public void testTwoTransactionsWithSameTwoParticipatingShards() throws Exception {
+        final String testName = "testTwoTransactionsWithSameTwoParticipatingShards";
+        LOG.info("{} starting", testName);
+
+        final TestKit kit1 = new TestKit(getSystem());
+        final TestKit kit2 = new TestKit(getSystem());
+
+        final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
+        final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
+
+        final TestActorRef<Shard> shardA = actorFactory.createTestActor(
+                newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardA);
+
+        final TestActorRef<Shard> shardB = actorFactory.createTestActor(
+                newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardB);
+
+        final TransactionIdentifier txId1 = nextTransactionId();
+        final TransactionIdentifier txId2 = nextTransactionId();
+
+        SortedSet<String> participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(),
+                shardBId.getShardName());
+
+        // Ready [tx1, tx2] on shard A.
+
+        shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
+        kit1.expectMsgClass(ReadyTransactionReply.class);
+
+        shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
+                participatingShardNames), kit2.getRef());
+        kit2.expectMsgClass(ReadyTransactionReply.class);
+
+        // Ready [tx2, tx1] on shard B.
+
+        shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
+                participatingShardNames), kit2.getRef());
+        kit2.expectMsgClass(ReadyTransactionReply.class);
+
+        shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
+        kit1.expectMsgClass(ReadyTransactionReply.class);
+
+        // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard
+        // in the participating shard list.
+
+        shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+        // Send tx1 CanCommit to A - it's at the head of the queue so should proceed.
+
+        shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
+        // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed.
+
+        shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Send tx2 CanCommit to B - tx1 should now be at the head of he queue.
+
+        shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+        // Finish commit of tx1.
+
+        shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CommitTransactionReply.class);
+
+        shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CommitTransactionReply.class);
+
+        // Finish commit of tx2.
+
+        kit2.expectMsgClass(CanCommitTransactionReply.class);
+        kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+        shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CommitTransactionReply.class);
+
+        shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CommitTransactionReply.class);
+
+        // Verify data in the data store.
+
+        verifyOuterListEntry(shardA, 1);
+        verifyOuterListEntry(shardB, 1);
+
+        LOG.info("{} ending", testName);
+    }
+
+    /**
+     * Test multiple tx's accessing a mix of same and differing shards.
+     * <pre>
+     *   tx1 -> shard X, shard B
+     *   tx2 -> shard X, shard B
+     *   tx3 -> shard A, shard B
+     *   tx4 -> shard A, shard B
+     *   tx5 -> shard A, shard B
+     * </pre>
+     * The tx's are readied such the pendingTransactions queue are as follows:
+     * <pre>
+     *   Queue for shard A -> tx3, tx4, tx5
+     *   Queue for shard B -> tx1, tx2, tx5, tx4, tx3
+     * </pre>
+     * Note: shard X means any other shard which isn't relevant for the test.
+     * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when
+     * CanCommit is requested.
+     */
+    @Test
+    public void testMultipleTransactionsWithMixedParticipatingShards() throws Exception {
+        final String testName = "testMultipleTransactionsWithMixedParticipatingShards";
+        LOG.info("{} starting", testName);
+
+        final TestKit kit1 = new TestKit(getSystem());
+        final TestKit kit2 = new TestKit(getSystem());
+        final TestKit kit3 = new TestKit(getSystem());
+        final TestKit kit4 = new TestKit(getSystem());
+        final TestKit kit5 = new TestKit(getSystem());
+
+        final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
+        final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
+
+        final TestActorRef<Shard> shardA = actorFactory.createTestActor(
+                newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardA);
+
+        final TestActorRef<Shard> shardB = actorFactory.createTestActor(
+                newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardB);
+
+        final TransactionIdentifier txId1 = nextTransactionId();
+        final TransactionIdentifier txId2 = nextTransactionId();
+        final TransactionIdentifier txId3 = nextTransactionId();
+        final TransactionIdentifier txId4 = nextTransactionId();
+        final TransactionIdentifier txId5 = nextTransactionId();
+
+        final SortedSet<String> participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(),
+                shardBId.getShardName());
+        final SortedSet<String> participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName());
+
+        // Ready [tx3, tx4, tx5] on shard A.
+
+        shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit3.getRef());
+        kit3.expectMsgClass(ReadyTransactionReply.class);
+
+        shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, outerMapNode(),
+                participatingShardNames1), kit4.getRef());
+        kit4.expectMsgClass(ReadyTransactionReply.class);
+
+        shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1),
+                ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit5.getRef());
+        kit5.expectMsgClass(ReadyTransactionReply.class);
+
+        // Ready [tx1, tx2, tx5, tx4, tx3] on shard B.
+
+        shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit1.getRef());
+        kit1.expectMsgClass(ReadyTransactionReply.class);
+
+        shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerMapNode(),
+                participatingShardNames2), kit2.getRef());
+        kit2.expectMsgClass(ReadyTransactionReply.class);
+
+        shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"),
+                ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef());
+        kit5.expectMsgClass(ReadyTransactionReply.class);
+
+        shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(),
+                participatingShardNames1), kit4.getRef());
+        kit4.expectMsgClass(ReadyTransactionReply.class);
+
+        shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1),
+                ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit3.getRef());
+        kit3.expectMsgClass(ReadyTransactionReply.class);
+
+        // Send tx3 CanCommit to A - it's at the head of the queue so should proceed.
+
+        shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
+        kit3.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Send tx1 CanCommit to B - it's at the head of the queue so should proceed.
+
+        shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating
+        // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue.
+
+        shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
+        kit3.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+        // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should
+        // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state.
+
+        shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
+        kit4.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+        // Send tx5 CanCommit to B - it's position in the queue should remain the same.
+
+        shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
+        kit5.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+        // Finish commit of tx1.
+
+        shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CommitTransactionReply.class);
+
+        // Finish commit of tx2.
+
+        shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+        shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CommitTransactionReply.class);
+
+        // Finish commit of tx3.
+
+        // From shard B
+        kit3.expectMsgClass(CanCommitTransactionReply.class);
+
+        shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
+        kit3.expectMsgClass(CommitTransactionReply.class);
+
+        shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
+        kit3.expectMsgClass(CommitTransactionReply.class);
+
+        // Finish commit of tx4.
+
+        // From shard B
+        kit4.expectMsgClass(CanCommitTransactionReply.class);
+
+        shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
+        kit4.expectMsgClass(CanCommitTransactionReply.class);
+        shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
+        kit4.expectMsgClass(CommitTransactionReply.class);
+
+        shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
+        kit4.expectMsgClass(CommitTransactionReply.class);
+
+        // Finish commit of tx5.
+
+        // From shard B
+        kit5.expectMsgClass(CanCommitTransactionReply.class);
+
+        shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
+        kit5.expectMsgClass(CanCommitTransactionReply.class);
+        shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
+        kit5.expectMsgClass(CommitTransactionReply.class);
+
+        shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
+        kit5.expectMsgClass(CommitTransactionReply.class);
+
+        verifyOuterListEntry(shardA, 1);
+        verifyInnerListEntry(shardB, 1, "one");
+
+        LOG.info("{} ending", testName);
+    }
+
+    /**
+     * Test 2 tx's accessing 2 shards, the second in common.
+     * <pre>
+     *   tx1 -> shard A, shard C
+     *   tx2 -> shard B, shard C
+     * </pre>
+     * The tx's are readied such the pendingTransactions queue are as follows:
+     * <pre>
+     *   Queue for shard A -> tx1
+     *   Queue for shard B -> tx2
+     *   Queue for shard C -> tx2, tx1
+     * </pre>
+     * When the tx's re committed verify the ready order is preserved.
+     */
+    @Test
+    public void testTwoTransactionsWithOneCommonParticipatingShard1() throws Exception {
+        final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1";
+        LOG.info("{} starting", testName);
+
+        final TestKit kit1 = new TestKit(getSystem());
+        final TestKit kit2 = new TestKit(getSystem());
+
+        final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
+        final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
+        final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
+
+        final TestActorRef<Shard> shardA = actorFactory.createTestActor(
+                newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardA);
+
+        final TestActorRef<Shard> shardB = actorFactory.createTestActor(
+                newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardB);
+
+        final TestActorRef<Shard> shardC = actorFactory.createTestActor(
+                newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardC);
+
+        final TransactionIdentifier txId1 = nextTransactionId();
+        final TransactionIdentifier txId2 = nextTransactionId();
+
+        SortedSet<String> participatingShardNames1 =
+                ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName());
+        SortedSet<String> participatingShardNames2 =
+                ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
+
+        // Ready [tx1] on shard A.
+
+        shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
+        kit1.expectMsgClass(ReadyTransactionReply.class);
+
+        // Ready [tx2] on shard B.
+
+        shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
+        kit2.expectMsgClass(ReadyTransactionReply.class);
+
+        // Ready [tx2, tx1] on shard C.
+
+        shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
+        kit2.expectMsgClass(ReadyTransactionReply.class);
+
+        shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
+                participatingShardNames1), kit1.getRef());
+        kit1.expectMsgClass(ReadyTransactionReply.class);
+
+        // Send tx1 CanCommit to A - should succeed.
+
+        shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Send tx2 CanCommit to B - should succeed.
+
+        shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating
+        // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed.
+
+        shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+        // Send tx2 CanCommit to C - it's at the head of the queue so should proceed.
+
+        shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Finish commit of tx2.
+
+        shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CommitTransactionReply.class);
+
+        shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CommitTransactionReply.class);
+
+        // Finish commit of tx1.
+
+        kit1.expectMsgClass(CanCommitTransactionReply.class);
+        shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CommitTransactionReply.class);
+
+        shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CommitTransactionReply.class);
+
+        // Verify data in the data store.
+
+        verifyOuterListEntry(shardC, 1);
+
+        LOG.info("{} ending", testName);
+    }
+
+    /**
+     * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common.
+     * <pre>
+     *   tx1 -> shard A, shard B
+     *   tx2 -> shard B, shard C
+     * </pre>
+     * The tx's are readied such the pendingTransactions queue are as follows:
+     * <pre>
+     *   Queue for shard A -> tx1
+     *   Queue for shard B -> tx2, tx1
+     *   Queue for shard C -> tx2
+     * </pre>
+     * When the tx's re committed verify the ready order is preserved.
+     */
+    @Test
+    public void testTwoTransactionsWithOneCommonParticipatingShard2() throws Exception {
+        final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2";
+        LOG.info("{} starting", testName);
+
+        final TestKit kit1 = new TestKit(getSystem());
+        final TestKit kit2 = new TestKit(getSystem());
+
+        final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
+        final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
+        final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
+
+        final TestActorRef<Shard> shardA = actorFactory.createTestActor(
+                newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardA);
+
+        final TestActorRef<Shard> shardB = actorFactory.createTestActor(
+                newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardB);
+
+        final TestActorRef<Shard> shardC = actorFactory.createTestActor(
+                newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardTestKit.waitUntilLeader(shardC);
+
+        final TransactionIdentifier txId1 = nextTransactionId();
+        final TransactionIdentifier txId2 = nextTransactionId();
+
+        SortedSet<String> participatingShardNames1 =
+                ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName());
+        SortedSet<String> participatingShardNames2 =
+                ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
+
+        // Ready [tx1] on shard A.
+
+        shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
+        kit1.expectMsgClass(ReadyTransactionReply.class);
+
+        // Ready [tx2, tx1] on shard B.
+
+        shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
+        kit2.expectMsgClass(ReadyTransactionReply.class);
+
+        shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
+                participatingShardNames1), kit1.getRef());
+        kit1.expectMsgClass(ReadyTransactionReply.class);
+
+        // Ready [tx2] on shard C.
+
+        shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
+                ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
+        kit2.expectMsgClass(ReadyTransactionReply.class);
+
+        // Send tx1 CanCommit to A - should succeed.
+
+        shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
+        // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed.
+
+        shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+        // Send tx2 CanCommit to B - it's at the head of the queue so should proceed.
+
+        shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+        // Finish commit of tx2.
+
+        shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+        shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CommitTransactionReply.class);
+
+        shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+        kit2.expectMsgClass(CommitTransactionReply.class);
+
+        // Finish commit of tx1.
+
+        kit1.expectMsgClass(CanCommitTransactionReply.class);
+        shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CommitTransactionReply.class);
+
+        shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+        kit1.expectMsgClass(CommitTransactionReply.class);
+
+        // Verify data in the data store.
+
+        verifyOuterListEntry(shardB, 1);
+
+        LOG.info("{} ending", testName);
+    }
+
+    static void verifyInnerListEntry(TestActorRef<Shard> shard, int outerID, String innerID)
+            throws Exception {
+        final YangInstanceIdentifier path = innerEntryPath(outerID, innerID);
+        final NormalizedNode<?, ?> innerListEntry = readStore(shard, path);
+        assertNotNull(path + " not found", innerListEntry);
+    }
+}
index 6e060f0..eb829f1 100644 (file)
@@ -113,7 +113,7 @@ public class ShardDataTreeTest extends AbstractTest {
             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
         }
 
-        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
@@ -482,7 +482,7 @@ public class ShardDataTreeTest extends AbstractTest {
                 shardDataTree.newReadWriteTransaction(nextTransactionId());
         final DataTreeModification snapshot = transaction.getSnapshot();
         operation.execute(snapshot);
-        return shardDataTree.finishTransaction(transaction);
+        return shardDataTree.finishTransaction(transaction, Optional.empty());
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -541,7 +541,7 @@ public class ShardDataTreeTest extends AbstractTest {
                 shardDataTree.newReadWriteTransaction(nextTransactionId());
         final DataTreeModification snapshot = transaction.getSnapshot();
         operation.execute(snapshot);
-        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
@@ -559,7 +559,7 @@ public class ShardDataTreeTest extends AbstractTest {
         for (final DataTreeCandidate candidateTip : candidates) {
             DataTreeCandidates.applyToModification(snapshot, candidateTip);
         }
-        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
index cca0bff..e8f7e32 100644 (file)
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -671,7 +672,7 @@ public class ShardTest extends AbstractShardTest {
                 final TransactionIdentifier transactionID = nextTransactionId();
                 final BatchedModifications batched = new BatchedModifications(transactionID,
                         DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
+                batched.setReady();
                 batched.setTotalMessagesSent(2);
 
                 shard.tell(batched, getRef());
@@ -714,7 +715,7 @@ public class ShardTest extends AbstractShardTest {
                 final Throwable cause = failure.cause();
 
                 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
+                batched.setReady();
                 batched.setTotalMessagesSent(2);
 
                 shard.tell(batched, getRef());
@@ -845,7 +846,8 @@ public class ShardTest extends AbstractShardTest {
                 failure = expectMsgClass(Failure.class);
                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
 
-                shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
+                shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
+                        getRef());
                 failure = expectMsgClass(Failure.class);
                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
             }
@@ -910,7 +912,8 @@ public class ShardTest extends AbstractShardTest {
 
                 final TransactionIdentifier txId = nextTransactionId();
                 modification.ready();
-                final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+                final ReadyLocalTransaction readyMessage =
+                        new ReadyLocalTransaction(txId, modification, true, Optional.empty());
 
                 shard.tell(readyMessage, getRef());
 
@@ -943,7 +946,8 @@ public class ShardTest extends AbstractShardTest {
 
                 final TransactionIdentifier txId = nextTransactionId();
                 modification.ready();
-                final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+                final ReadyLocalTransaction readyMessage =
+                        new ReadyLocalTransaction(txId, modification, false, Optional.empty());
 
                 shard.tell(readyMessage, getRef());
 
@@ -1571,7 +1575,7 @@ public class ShardTest extends AbstractShardTest {
                         .apply(modification3);
                 modification3.ready();
                 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
-                        true);
+                        true, Optional.empty());
                 shard.tell(readyMessage, getRef());
 
                 // Commit the first Tx. After completing, the second should
index f3acf29..0c894a3 100644 (file)
@@ -244,7 +244,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 assertEquals("getNumBatched", 1, reply.getNumBatched());
 
                 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
+                batched.setReady();
                 batched.setTotalMessagesSent(2);
 
                 transaction.tell(batched, getRef());
@@ -272,7 +272,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
                         DataStoreVersions.CURRENT_VERSION);
                 batched.addModification(new WriteModification(writePath, writeData));
-                batched.setReady(true);
+                batched.setReady();
                 batched.setDoCommitOnReady(true);
                 batched.setTotalMessagesSent(1);
 
@@ -311,7 +311,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
 
                 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
+                batched.setReady();
                 batched.setTotalMessagesSent(2);
 
                 transaction.tell(batched, getRef());
@@ -339,7 +339,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
                         DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
+                batched.setReady();
                 batched.setTotalMessagesSent(2);
 
                 transaction.tell(batched, getRef());
index 4550894..be19a3d 100644 (file)
@@ -60,7 +60,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
         doReturn(Optional.empty()).when(mockUserCohorts).abort();
 
         cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
-            mockUserCohorts);
+            mockUserCohorts, Optional.empty());
     }
 
     @Test
index 5fbf878..4fa4fcd 100644 (file)
@@ -31,6 +31,7 @@ import akka.dispatch.Futures;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
@@ -38,12 +39,14 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collection;
 import java.util.List;
+import java.util.SortedSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
@@ -57,6 +60,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
@@ -524,22 +528,59 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     public void testReadyWithMultipleShardWrites() throws Exception {
         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
-        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
+        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
+                TestModel.JUNK_QNAME.getLocalName());
 
         expectBatchedModificationsReady(actorRef1);
         expectBatchedModificationsReady(actorRef2);
 
+        ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
+                .actorSelection(actorRef3.path().toString());
+
+        doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
+                .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
+
+        expectReadyLocalTransaction(actorRef3, false);
+
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
 
         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
-                actorSelection(actorRef2));
+                actorSelection(actorRef2), actorSelection(actorRef3));
+
+        SortedSet<String> expShardNames =
+                ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
+                        TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
+
+        ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
+        assertEquals("Participating shards present", true,
+                batchedMods.getValue().getParticipatingShardNames().isPresent());
+        assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
+
+        batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
+        assertEquals("Participating shards present", true,
+                batchedMods.getValue().getParticipatingShardNames().isPresent());
+        assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
+
+        ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
+        assertEquals("Participating shards present", true,
+                readyLocalTx.getValue().getParticipatingShardNames().isPresent());
+        assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
     }
 
     @Test
@@ -657,6 +698,12 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
         assertTrue(ready instanceof SingleCommitCohortProxy);
         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
+
+        ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
+        assertEquals("Participating shards present", false,
+                readyLocalTx.getValue().getParticipatingShardNames().isPresent());
     }
 
     @Test
@@ -725,7 +772,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
-        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
+        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
+                TestModel.JUNK_QNAME.getLocalName());
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
index 1d4cc3d..b01c072 100644 (file)
@@ -9,7 +9,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import static org.junit.Assert.assertEquals;
 
+import com.google.common.collect.ImmutableSortedSet;
 import java.io.Serializable;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -49,7 +52,9 @@ public class BatchedModificationsTest extends AbstractTest {
         batched.addModification(new WriteModification(writePath, writeData));
         batched.addModification(new MergeModification(mergePath, mergeData));
         batched.addModification(new DeleteModification(deletePath));
-        batched.setReady(true);
+        assertEquals("isReady", false, batched.isReady());
+        batched.setReady();
+        assertEquals("isReady", true, batched.isReady());
         batched.setTotalMessagesSent(5);
 
         BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
@@ -58,6 +63,8 @@ public class BatchedModificationsTest extends AbstractTest {
         assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
         assertEquals("getTransactionID", tx1, clone.getTransactionId());
         assertEquals("isReady", true, clone.isReady());
+        assertEquals("isDoCommitOnReady", false, clone.isDoCommitOnReady());
+        assertEquals("participatingShardNames present", false, clone.getParticipatingShardNames().isPresent());
         assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
 
         assertEquals("getModifications size", 3, clone.getModifications().size());
@@ -76,18 +83,49 @@ public class BatchedModificationsTest extends AbstractTest {
         assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
         assertEquals("getPath", deletePath, delete.getPath());
 
-        // Test with different params.
+        // Test with participating shard names.
+
         final TransactionIdentifier tx2 = nextTransactionId();
         batched = new BatchedModifications(tx2, (short)10000);
+        final SortedSet<String> shardNames = ImmutableSortedSet.of("one", "two");
+        batched.setReady(Optional.of(shardNames));
+        batched.setDoCommitOnReady(true);
+        assertEquals("isReady", true, batched.isReady());
 
         clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
 
         assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
         assertEquals("getTransactionID", tx2, clone.getTransactionId());
-        assertEquals("isReady", false, clone.isReady());
+        assertEquals("isReady", true, clone.isReady());
+        assertEquals("isDoCommitOnReady", true, clone.isDoCommitOnReady());
+        assertEquals("participatingShardNames present", true, clone.getParticipatingShardNames().isPresent());
+        assertEquals("participatingShardNames", shardNames, clone.getParticipatingShardNames().get());
+        assertEquals("getModifications size", 0, clone.getModifications().size());
+
+        // Test not ready.
 
+        batched = new BatchedModifications(tx2, DataStoreVersions.CURRENT_VERSION);
+
+        clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
+
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+        assertEquals("getTransactionID", tx2, clone.getTransactionId());
+        assertEquals("isReady", false, clone.isReady());
         assertEquals("getModifications size", 0, clone.getModifications().size());
 
+        // Test pre-Flourine
+
+        batched = new BatchedModifications(tx2, DataStoreVersions.BORON_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+        batched.setReady(Optional.of(ImmutableSortedSet.of("one", "two")));
+
+        clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
+
+        assertEquals("getVersion", DataStoreVersions.BORON_VERSION, clone.getVersion());
+        assertEquals("getTransactionID", tx2, clone.getTransactionId());
+        assertEquals("isReady", true, clone.isReady());
+        assertEquals("participatingShardNames present", false, clone.getParticipatingShardNames().isPresent());
+        assertEquals("getModifications size", 1, clone.getModifications().size());
     }
 
     @Test
index 16e1ab3..257ce4f 100644 (file)
@@ -12,8 +12,11 @@ import static org.junit.Assert.assertNotNull;
 
 import akka.actor.ExtendedActorSystem;
 import akka.testkit.javadsl.TestKit;
+import com.google.common.collect.ImmutableSortedSet;
 import java.io.NotSerializableException;
 import java.util.List;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.AbstractTest;
@@ -48,8 +51,10 @@ public class ReadyLocalTransactionSerializerTest extends AbstractTest {
         MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
         new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
 
+        final SortedSet<String> shardNames = ImmutableSortedSet.of("one", "two");
         TransactionIdentifier txId = nextTransactionId();
-        ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+        ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true,
+                Optional.of(shardNames));
 
         final ExtendedActorSystem system = (ExtendedActorSystem) ExtendedActorSystem.create("test");
         final Object deserialized;
@@ -66,6 +71,10 @@ public class ReadyLocalTransactionSerializerTest extends AbstractTest {
         BatchedModifications batched = (BatchedModifications)deserialized;
         assertEquals("getTransactionID", txId, batched.getTransactionId());
         assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, batched.getVersion());
+        assertEquals("isReady", true, batched.isReady());
+        assertEquals("isDoCommitOnReady", true, batched.isDoCommitOnReady());
+        assertEquals("participatingShardNames present", true, batched.getParticipatingShardNames().isPresent());
+        assertEquals("participatingShardNames", shardNames, batched.getParticipatingShardNames().get());
 
         List<Modification> batchedMods = batched.getModifications();
         assertEquals("getModifications size", 2, batchedMods.size());
index 836d7f9..1f7c3fd 100644 (file)
@@ -60,6 +60,10 @@ public final class TestModel {
         return YangParserTestUtils.parseYangResource(DATASTORE_TEST_YANG);
     }
 
+    public static DataContainerChild<?, ?> outerMapNode() {
+        return ImmutableNodes.mapNodeBuilder(OUTER_LIST_QNAME).build();
+    }
+
     public static DataContainerChild<?, ?> outerNode(final int... ids) {
         CollectionNodeBuilder<MapEntryNode, MapNode> outer = ImmutableNodes.mapNodeBuilder(OUTER_LIST_QNAME);
         for (int id: ids) {
@@ -115,4 +119,8 @@ public final class TestModel {
     public static YangInstanceIdentifier innerEntryPath(final int id, final String name) {
         return OUTER_LIST_PATH.node(outerEntryKey(id)).node(INNER_LIST_QNAME).node(innerEntryKey(name));
     }
+
+    public static YangInstanceIdentifier innerMapPath(final int id) {
+        return OUTER_LIST_PATH.node(outerEntryKey(id)).node(INNER_LIST_QNAME);
+    }
 }
index 5b08475..abefdf0 100644 (file)
@@ -5,6 +5,7 @@ akka {
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
     actor {
+      warn-about-java-serializer-usage = false
     }
 }
 
@@ -63,6 +64,8 @@ test-config {
       serialization-bindings {
           "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
+
+      warn-about-java-serializer-usage = false
     }
     remote {
       log-remote-lifecycle-events = off
@@ -127,6 +130,8 @@ Member1 {
       serialization-bindings {
           "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
+
+      warn-about-java-serializer-usage = false
     }
     remote {
       log-remote-lifecycle-events = off
@@ -188,6 +193,8 @@ Member2 {
       serialization-bindings {
           "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
+
+      warn-about-java-serializer-usage = false
     }
     remote {
       log-remote-lifecycle-events = off
@@ -251,6 +258,8 @@ Member3 {
       serialization-bindings {
           "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
+
+      warn-about-java-serializer-usage = false
     }
     remote {
       log-remote-lifecycle-events = off
@@ -314,6 +323,8 @@ Member4 {
       serialization-bindings {
           "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
+
+      warn-about-java-serializer-usage = false
     }
     remote {
       log-remote-lifecycle-events = off
@@ -377,6 +388,8 @@ Member5 {
       serialization-bindings {
           "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
+
+      warn-about-java-serializer-usage = false
     }
     remote {
       log-remote-lifecycle-events = off
@@ -440,6 +453,8 @@ Member256 {
       serialization-bindings {
           "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
+
+      warn-about-java-serializer-usage = false
     }
     remote {
       log-remote-lifecycle-events = off

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.