X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=bf5d271bfc418a87288f2760b1d6d73233ae9e30;hb=d7c9a8ccfcb57f005490a226803d094289997ef9;hp=2773a3e3bfd21ad4559bb6baf05c1d48661cfe9c;hpb=057b787289f7b909d7013c22ac73a1c91c860af8;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 2773a3e3bf..bf5d271bfc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -23,8 +23,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -188,13 +186,13 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor */ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { - final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(), - message.getModification()); + final TransactionIdentifier txId = message.getTransactionId(); + final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); - log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId()); + log.debug("{}: Applying local modifications for Tx {}", name, txId); if (message.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -287,7 +285,7 @@ final class ShardCommitCoordinator { handleCanCommit(cohortEntry); } - private void doCommit(final CohortEntry cohortEntry) { + void doCommit(final CohortEntry cohortEntry) { log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId()); // We perform the preCommit phase here atomically with the commit phase. This is an @@ -311,7 +309,7 @@ final class ShardCommitCoordinator { }); } - private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) { + void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) { log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId()); cohortEntry.commit(new FutureCallback() { @@ -370,21 +368,25 @@ final class ShardCommitCoordinator { log.debug("{}: Aborting transaction {}", name, transactionID); final ActorRef self = shard.getSelf(); - try { - cohortEntry.abort(); + cohortEntry.abort(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + if (sender != null) { + sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); + } + } - shard.getShardMBean().incrementAbortTransactionsCount(); + @Override + public void onFailure(final Throwable failure) { + log.error("{}: An exception happened during abort", name, failure); - if (sender != null) { - sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); + if (sender != null) { + sender.tell(new Failure(failure), self); + } } - } catch (InterruptedException | ExecutionException | TimeoutException e) { - log.error("{}: An exception happened during abort", name, e); + }); - if (sender != null) { - sender.tell(new Failure(e), self); - } - } + shard.getShardMBean().incrementAbortTransactionsCount(); } void checkForExpiredTransactions(final long timeout, final Shard shard) {