Fix findbugs warnings
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 080d3eec23a22ef3f7cb9771d78f42e905d08dc1..403a96819f392726d7b24ef715827e1187f29841 100644 (file)
@@ -20,7 +20,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import javax.annotation.Nonnull;
@@ -103,7 +102,7 @@ final class ShardCommitCoordinator {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionId(), ready.getTxnClientVersion());
 
-        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
 
@@ -162,7 +161,7 @@ final class ShardCommitCoordinator {
             }
 
             cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
-            cohortEntry.ready(cohortDecorator);
+            cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
 
             if (batched.isDoCommitOnReady()) {
                 cohortEntry.setReplySender(sender);
@@ -187,7 +186,8 @@ final class ShardCommitCoordinator {
      */
     void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
         final TransactionIdentifier txId = message.getTransactionId();
-        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
+        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+                message.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
@@ -227,7 +227,9 @@ final class ShardCommitCoordinator {
 
         BatchedModifications last = newModifications.getLast();
         last.setDoCommitOnReady(from.isDoCommitOnReady());
-        last.setReady(from.isReady());
+        if (from.isReady()) {
+            last.setReady(from.getParticipatingShardNames());
+        }
         last.setTotalMessagesSent(newModifications.size());
         return newModifications;
     }
@@ -249,8 +251,8 @@ final class ShardCommitCoordinator {
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.debug("{}: An exception occurred during canCommit for {}: {}", name,
-                        cohortEntry.getTransactionId(), failure);
+                log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
+                    failure);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
@@ -274,7 +276,7 @@ final class ShardCommitCoordinator {
             // between canCommit and ready and the entry was expired from the cache or it was aborted.
             IllegalStateException ex = new IllegalStateException(
                     String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
-            log.error(ex.getMessage());
+            log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex);
             sender.tell(new Failure(ex), shard.self());
             return;
         }
@@ -351,7 +353,7 @@ final class ShardCommitCoordinator {
             // or it was aborted.
             IllegalStateException ex = new IllegalStateException(
                     String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
-            log.error(ex.getMessage());
+            log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex);
             sender.tell(new Failure(ex), shard.self());
             return;
         }
@@ -395,13 +397,7 @@ final class ShardCommitCoordinator {
     }
 
     void checkForExpiredTransactions(final long timeout, final Shard shard) {
-        Iterator<CohortEntry> iter = cohortCache.values().iterator();
-        while (iter.hasNext()) {
-            CohortEntry cohortEntry = iter.next();
-            if (cohortEntry.isFailed()) {
-                iter.remove();
-            }
-        }
+        cohortCache.values().removeIf(CohortEntry::isFailed);
     }
 
     void abortPendingTransactions(final String reason, final Shard shard) {
@@ -454,7 +450,7 @@ final class ShardCommitCoordinator {
             if (last != null) {
                 final boolean immediate = cohortEntry.isDoImmediateCommit();
                 last.setDoCommitOnReady(immediate);
-                last.setReady(true);
+                last.setReady(cohortEntry.getParticipatingShardNames());
                 last.setTotalMessagesSent(newMessages.size());
 
                 messages.addAll(newMessages);