X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardCommitCoordinator.java;h=a343d7373f21ba1d748bba2ac879cf9737751780;hb=a5e45c08c64f450aef9f3bf2d1f98714d01db1ed;hp=9c65ed71d268e2d4a786ac680ccb744570b911dd;hpb=057b787289f7b909d7013c22ac73a1c91c860af8;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index 9c65ed71d2..a343d7373f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -13,10 +13,12 @@ import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Status.Failure; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Queue; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; @@ -158,13 +160,11 @@ class EntityOwnershipShardCommitCoordinator { } void commitModification(Modification modification, EntityOwnershipShard shard) { - BatchedModifications modifications = newBatchedModifications(); - modifications.addModification(modification); - commitModifications(modifications, shard); + commitModifications(ImmutableList.of(modification), shard); } - void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) { - if (modifications.getModifications().isEmpty()) { + void commitModifications(List modifications, EntityOwnershipShard shard) { + if (modifications.isEmpty()) { return; } @@ -175,9 +175,10 @@ class EntityOwnershipShardCommitCoordinator { inflightCommit != null ? "A commit is inflight" : "No shard leader"); } - pendingModifications.addAll(modifications.getModifications()); + pendingModifications.addAll(modifications); } else { - inflightCommit = modifications; + inflightCommit = newBatchedModifications(); + inflightCommit.addModifications(modifications); shard.tryCommitModifications(inflightCommit); } } @@ -221,22 +222,17 @@ class EntityOwnershipShardCommitCoordinator { } // Prune the subsequent pending modifications. - Iterator iter = pendingModifications.iterator(); - while (iter.hasNext()) { - Modification mod = iter.next(); - if (!canForwardModificationToNewLeader(mod)) { - iter.remove(); - } - } + pendingModifications.removeIf(mod -> !canForwardModificationToNewLeader(mod)); } } - @Nullable - private BatchedModifications pruneModifications(BatchedModifications toPrune) { + private @Nullable BatchedModifications pruneModifications(BatchedModifications toPrune) { BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(), toPrune.getVersion()); prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady()); - prunedModifications.setReady(toPrune.isReady()); + if (toPrune.isReady()) { + prunedModifications.setReady(toPrune.getParticipatingShardNames()); + } prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent()); for (Modification mod: toPrune.getModifications()) { if (canForwardModificationToNewLeader(mod)) { @@ -266,15 +262,15 @@ class EntityOwnershipShardCommitCoordinator { private void newInflightCommitWithDifferentTransactionID() { BatchedModifications newBatchedModifications = newBatchedModifications(); - newBatchedModifications.getModifications().addAll(inflightCommit.getModifications()); + newBatchedModifications.addModifications(inflightCommit.getModifications()); inflightCommit = newBatchedModifications; } - BatchedModifications newBatchedModifications() { + private BatchedModifications newBatchedModifications() { BatchedModifications modifications = new BatchedModifications( new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION); modifications.setDoCommitOnReady(true); - modifications.setReady(true); + modifications.setReady(); modifications.setTotalMessagesSent(1); return modifications; }