Fix shard deadlock in 3 nodes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContext.java
index 27969b3e8ef405331c1e69fb1b9f912612d22538..ce4bda74cfa7357a8114d84a06f58fc49a35c36c 100644 (file)
@@ -13,6 +13,8 @@ import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -80,11 +82,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
         // Send the remaining batched modifications, if any, with the ready flag set.
         bumpPermits(havePermit);
-        return sendBatchedModifications(true, true);
+        return sendBatchedModifications(true, true, Optional.empty());
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
+    public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+            final Optional<SortedSet<String>> participatingShardNames) {
         logModificationCount();
 
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
@@ -92,7 +95,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send the remaining batched modifications, if any, with the ready flag set.
 
         bumpPermits(havePermit);
-        Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
 
         return transformReadyReply(lastModificationsFuture);
     }
@@ -133,10 +136,11 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     protected Future<Object> sendBatchedModifications() {
-        return sendBatchedModifications(false, false);
+        return sendBatchedModifications(false, false, Optional.empty());
     }
 
-    protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) {
+    protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
+            final Optional<SortedSet<String>> participatingShardNames) {
         Future<Object> sent = null;
         if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
             if (batchedModifications == null) {
@@ -146,7 +150,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                     batchedModifications.getModifications().size(), ready);
 
-            batchedModifications.setReady(ready);
             batchedModifications.setDoCommitOnReady(doCommitOnReady);
             batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
 
@@ -155,6 +158,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             batchPermits = 0;
 
             if (ready) {
+                batchedModifications.setReady(participatingShardNames);
+                batchedModifications.setDoCommitOnReady(doCommitOnReady);
                 batchedModifications = null;
             } else {
                 batchedModifications = newBatchedModifications();