BUG-8941: enqueue purges once ask-based transactions resolve
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 2773a3e3bfd21ad4559bb6baf05c1d48661cfe9c..080d3eec23a22ef3f7cb9771d78f42e905d08dc1 100644 (file)
@@ -23,8 +23,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -188,13 +186,13 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      */
     void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
-        final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(),
-            message.getModification());
+        final TransactionIdentifier txId = message.getTransactionId();
+        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
-        log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId());
+        log.debug("{}: Applying local modifications for Tx {}", name, txId);
 
         if (message.isDoCommitOnReady()) {
             cohortEntry.setReplySender(sender);
@@ -287,7 +285,7 @@ final class ShardCommitCoordinator {
         handleCanCommit(cohortEntry);
     }
 
-    private void doCommit(final CohortEntry cohortEntry) {
+    void doCommit(final CohortEntry cohortEntry) {
         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
 
         // We perform the preCommit phase here atomically with the commit phase. This is an
@@ -311,7 +309,7 @@ final class ShardCommitCoordinator {
         });
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+    void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
         log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
 
         cohortEntry.commit(new FutureCallback<UnsignedLong>() {
@@ -320,6 +318,7 @@ final class ShardCommitCoordinator {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
                     sender);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
@@ -328,8 +327,9 @@ final class ShardCommitCoordinator {
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                        cohortEntry.getTransactionId(), failure);
+                final TransactionIdentifier txId = cohortEntry.getTransactionId();
+                log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(new Failure(failure), cohortEntry.getShard().self());
@@ -370,21 +370,28 @@ final class ShardCommitCoordinator {
         log.debug("{}: Aborting transaction {}", name, transactionID);
 
         final ActorRef self = shard.getSelf();
-        try {
-            cohortEntry.abort();
-
-            shard.getShardMBean().incrementAbortTransactionsCount();
+        cohortEntry.abort(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
 
-            if (sender != null) {
-                sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+                if (sender != null) {
+                    sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+                }
             }
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.error("{}: An exception happened during abort", name, e);
 
-            if (sender != null) {
-                sender.tell(new Failure(e), self);
+            @Override
+            public void onFailure(final Throwable failure) {
+                log.error("{}: An exception happened during abort", name, failure);
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
+
+                if (sender != null) {
+                    sender.tell(new Failure(failure), self);
+                }
             }
-        }
+        });
+
+        shard.getShardMBean().incrementAbortTransactionsCount();
     }
 
     void checkForExpiredTransactions(final long timeout, final Shard shard) {