Fix illegal check in CreateTransactionReply
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 33634b1d6c3e322642e9b458e05d59feb74a577e..403a96819f392726d7b24ef715827e1187f29841 100644 (file)
@@ -20,7 +20,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import javax.annotation.Nonnull;
@@ -103,7 +102,7 @@ final class ShardCommitCoordinator {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionId(), ready.getTxnClientVersion());
 
-        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
 
@@ -162,7 +161,7 @@ final class ShardCommitCoordinator {
             }
 
             cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
-            cohortEntry.ready(cohortDecorator);
+            cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
 
             if (batched.isDoCommitOnReady()) {
                 cohortEntry.setReplySender(sender);
@@ -186,13 +185,14 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      */
     void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
-        final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(),
-            message.getModification());
+        final TransactionIdentifier txId = message.getTransactionId();
+        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+                message.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
-        log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId());
+        log.debug("{}: Applying local modifications for Tx {}", name, txId);
 
         if (message.isDoCommitOnReady()) {
             cohortEntry.setReplySender(sender);
@@ -227,7 +227,9 @@ final class ShardCommitCoordinator {
 
         BatchedModifications last = newModifications.getLast();
         last.setDoCommitOnReady(from.isDoCommitOnReady());
-        last.setReady(from.isReady());
+        if (from.isReady()) {
+            last.setReady(from.getParticipatingShardNames());
+        }
         last.setTotalMessagesSent(newModifications.size());
         return newModifications;
     }
@@ -249,8 +251,8 @@ final class ShardCommitCoordinator {
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.debug("{}: An exception occurred during canCommit for {}: {}", name,
-                        cohortEntry.getTransactionId(), failure);
+                log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
+                    failure);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
@@ -274,7 +276,7 @@ final class ShardCommitCoordinator {
             // between canCommit and ready and the entry was expired from the cache or it was aborted.
             IllegalStateException ex = new IllegalStateException(
                     String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
-            log.error(ex.getMessage());
+            log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex);
             sender.tell(new Failure(ex), shard.self());
             return;
         }
@@ -285,7 +287,7 @@ final class ShardCommitCoordinator {
         handleCanCommit(cohortEntry);
     }
 
-    private void doCommit(final CohortEntry cohortEntry) {
+    void doCommit(final CohortEntry cohortEntry) {
         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
 
         // We perform the preCommit phase here atomically with the commit phase. This is an
@@ -309,7 +311,7 @@ final class ShardCommitCoordinator {
         });
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+    void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
         log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
 
         cohortEntry.commit(new FutureCallback<UnsignedLong>() {
@@ -318,6 +320,7 @@ final class ShardCommitCoordinator {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
                     sender);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
@@ -326,8 +329,9 @@ final class ShardCommitCoordinator {
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                        cohortEntry.getTransactionId(), failure);
+                final TransactionIdentifier txId = cohortEntry.getTransactionId();
+                log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
+                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(new Failure(failure), cohortEntry.getShard().self());
@@ -349,7 +353,7 @@ final class ShardCommitCoordinator {
             // or it was aborted.
             IllegalStateException ex = new IllegalStateException(
                     String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
-            log.error(ex.getMessage());
+            log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex);
             sender.tell(new Failure(ex), shard.self());
             return;
         }
@@ -371,6 +375,8 @@ final class ShardCommitCoordinator {
         cohortEntry.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
+
                 if (sender != null) {
                     sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
                 }
@@ -379,6 +385,7 @@ final class ShardCommitCoordinator {
             @Override
             public void onFailure(final Throwable failure) {
                 log.error("{}: An exception happened during abort", name, failure);
+                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
 
                 if (sender != null) {
                     sender.tell(new Failure(failure), self);
@@ -390,13 +397,7 @@ final class ShardCommitCoordinator {
     }
 
     void checkForExpiredTransactions(final long timeout, final Shard shard) {
-        Iterator<CohortEntry> iter = cohortCache.values().iterator();
-        while (iter.hasNext()) {
-            CohortEntry cohortEntry = iter.next();
-            if (cohortEntry.isFailed()) {
-                iter.remove();
-            }
-        }
+        cohortCache.values().removeIf(CohortEntry::isFailed);
     }
 
     void abortPendingTransactions(final String reason, final Shard shard) {
@@ -449,7 +450,7 @@ final class ShardCommitCoordinator {
             if (last != null) {
                 final boolean immediate = cohortEntry.isDoImmediateCommit();
                 last.setDoCommitOnReady(immediate);
-                last.setReady(true);
+                last.setReady(cohortEntry.getParticipatingShardNames());
                 last.setTotalMessagesSent(newMessages.size());
 
                 messages.addAll(newMessages);