X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=ad99fce9468951111b7fdf66c2a5c11d5478a90c;hp=eebad9ce065c4fbb98e3f1a7c53ecb122c9cb5a0;hb=bdce894fa73714aa9f68eadad3524cfc94dc71d2;hpb=a6af137c30470b86d4bc624d4c48cb686495a182 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index eebad9ce06..ad99fce946 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -22,7 +22,7 @@ import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; -import javax.annotation.Nonnull; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -129,7 +129,7 @@ final class ShardCommitCoordinator { */ void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId()); - if (cohortEntry == null) { + if (cohortEntry == null || cohortEntry.isSealed()) { cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()), batched.getVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -156,8 +156,8 @@ final class ShardCommitCoordinator { } if (log.isDebugEnabled()) { - log.debug("{}: Readying Tx {}, client version {}", name, - batched.getTransactionId(), batched.getVersion()); + log.debug("{}: Readying Tx {} of {} operations, client version {}", name, + batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion()); } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); @@ -251,8 +251,8 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { - log.debug("{}: An exception occurred during canCommit for {}: {}", name, - cohortEntry.getTransactionId(), failure); + log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(), + failure); cohortCache.remove(cohortEntry.getTransactionId()); cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); @@ -276,7 +276,7 @@ final class ShardCommitCoordinator { // between canCommit and ready and the entry was expired from the cache or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID)); - log.error(ex.getMessage()); + log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex); sender.tell(new Failure(ex), shard.self()); return; } @@ -311,7 +311,7 @@ final class ShardCommitCoordinator { }); } - void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) { + void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) { log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId()); cohortEntry.commit(new FutureCallback() { @@ -320,7 +320,6 @@ final class ShardCommitCoordinator { final TransactionIdentifier txId = cohortEntry.getTransactionId(); log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result, sender); - cohortEntry.getShard().getDataStore().purgeTransaction(txId, null); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), @@ -331,7 +330,6 @@ final class ShardCommitCoordinator { public void onFailure(final Throwable failure) { final TransactionIdentifier txId = cohortEntry.getTransactionId(); log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure); - cohortEntry.getShard().getDataStore().purgeTransaction(txId, null); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(new Failure(failure), cohortEntry.getShard().self()); @@ -353,7 +351,7 @@ final class ShardCommitCoordinator { // or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID)); - log.error(ex.getMessage()); + log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex); sender.tell(new Failure(ex), shard.self()); return; } @@ -375,8 +373,6 @@ final class ShardCommitCoordinator { cohortEntry.abort(new FutureCallback() { @Override public void onSuccess(final Void result) { - shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); - if (sender != null) { sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } @@ -385,7 +381,6 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { log.error("{}: An exception happened during abort", name, failure); - shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); if (sender != null) { sender.tell(new Failure(failure), self);