CDS: Changes to Tx abort in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index ce3cae481e766f4ba4b7f93593e8d12a282ee294..5a10c3e961bc8e038a0dd3969134b903aace2a56 100644 (file)
@@ -18,9 +18,6 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactio
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -83,7 +79,7 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+    protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     @VisibleForTesting
     static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
@@ -326,7 +322,7 @@ public class Shard extends RaftActor {
     }
 
     void continueCommit(final CohortEntry cohortEntry) throws Exception {
-        final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+        final DataTreeCandidate candidate = cohortEntry.getCandidate();
 
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
@@ -349,10 +345,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().commit().get();
+            cohortEntry.commit();
 
             sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
@@ -385,7 +378,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 try {
-                    store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+                    store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
                 } catch (DataValidationFailedException e) {
                     shardMBean.incrementFailedTransactionsCount();
                     LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
@@ -494,38 +487,7 @@ public class Shard extends RaftActor {
     }
 
     void doAbortTransaction(final String transactionID, final ActorRef sender) {
-        final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry != null) {
-            LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
-
-            // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
-            // aborted during replication in which case we may still commit locally if replication
-            // succeeds.
-            commitCoordinator.currentTransactionComplete(transactionID, false);
-
-            final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
-            final ActorRef self = getSelf();
-
-            Futures.addCallback(future, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(final Void v) {
-                    shardMBean.incrementAbortTransactionsCount();
-
-                    if(sender != null) {
-                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
-                    }
-                }
-
-                @Override
-                public void onFailure(final Throwable t) {
-                    LOG.error("{}: An exception happened during abort", persistenceId(), t);
-
-                    if(sender != null) {
-                        sender.tell(new akka.actor.Status.Failure(t), self);
-                    }
-                }
-            });
-        }
+        commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
     private void handleCreateTransaction(final Object message) {