Simplify code using Java 8 features
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 080d3eec23a22ef3f7cb9771d78f42e905d08dc1..eebad9ce065c4fbb98e3f1a7c53ecb122c9cb5a0 100644 (file)
@@ -20,7 +20,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import javax.annotation.Nonnull;
@@ -103,7 +102,7 @@ final class ShardCommitCoordinator {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionId(), ready.getTxnClientVersion());
 
-        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
 
@@ -162,7 +161,7 @@ final class ShardCommitCoordinator {
             }
 
             cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
-            cohortEntry.ready(cohortDecorator);
+            cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
 
             if (batched.isDoCommitOnReady()) {
                 cohortEntry.setReplySender(sender);
@@ -187,7 +186,8 @@ final class ShardCommitCoordinator {
      */
     void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
         final TransactionIdentifier txId = message.getTransactionId();
-        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
+        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+                message.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
@@ -227,7 +227,9 @@ final class ShardCommitCoordinator {
 
         BatchedModifications last = newModifications.getLast();
         last.setDoCommitOnReady(from.isDoCommitOnReady());
-        last.setReady(from.isReady());
+        if (from.isReady()) {
+            last.setReady(from.getParticipatingShardNames());
+        }
         last.setTotalMessagesSent(newModifications.size());
         return newModifications;
     }
@@ -395,13 +397,7 @@ final class ShardCommitCoordinator {
     }
 
     void checkForExpiredTransactions(final long timeout, final Shard shard) {
-        Iterator<CohortEntry> iter = cohortCache.values().iterator();
-        while (iter.hasNext()) {
-            CohortEntry cohortEntry = iter.next();
-            if (cohortEntry.isFailed()) {
-                iter.remove();
-            }
-        }
+        cohortCache.values().removeIf(CohortEntry::isFailed);
     }
 
     void abortPendingTransactions(final String reason, final Shard shard) {
@@ -454,7 +450,7 @@ final class ShardCommitCoordinator {
             if (last != null) {
                 final boolean immediate = cohortEntry.isDoImmediateCommit();
                 last.setDoCommitOnReady(immediate);
-                last.setReady(true);
+                last.setReady(cohortEntry.getParticipatingShardNames());
                 last.setTotalMessagesSent(newMessages.size());
 
                 messages.addAll(newMessages);