X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardCommitCoordinator.java;h=eaca661619fad6c63d2a5339c997bd7fc26a171f;hb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;hp=a6958297443e9cb7e635f300a96c1b3346e715e6;hpb=8f2b69d0944fe8ebc7e4300bb7a05265c7c290d7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index a695829744..eaca661619 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -7,16 +7,18 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static java.util.Objects.requireNonNull; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Status.Failure; -import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Queue; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; @@ -54,13 +56,13 @@ class EntityOwnershipShardCommitCoordinator { private Cancellable retryCommitSchedule; private long transactionIDCounter = 0; - EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) { - this.log = Preconditions.checkNotNull(log); + EntityOwnershipShardCommitCoordinator(final MemberName localMemberName, final Logger log) { + this.log = requireNonNull(log); historyId = new LocalHistoryIdentifier( ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0); } - boolean handleMessage(Object message, EntityOwnershipShard shard) { + boolean handleMessage(final Object message, final EntityOwnershipShard shard) { boolean handled = true; if (CommitTransactionReply.isSerializedType(message)) { // Successful reply from a local commit. @@ -77,7 +79,7 @@ class EntityOwnershipShardCommitCoordinator { return handled; } - private void retryInflightCommit(EntityOwnershipShard shard) { + private void retryInflightCommit(final EntityOwnershipShard shard) { // Shouldn't be null happen but verify anyway if (inflightCommit == null) { return; @@ -92,7 +94,7 @@ class EntityOwnershipShardCommitCoordinator { } } - void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) { + void inflightCommitFailure(final Throwable cause, final EntityOwnershipShard shard) { // This should've originated from a failed inflight commit but verify anyway if (inflightCommit == null) { return; @@ -109,7 +111,7 @@ class EntityOwnershipShardCommitCoordinator { scheduleInflightCommitRetry(shard); } - private void scheduleInflightCommitRetry(EntityOwnershipShard shard) { + private void scheduleInflightCommitRetry(final EntityOwnershipShard shard) { FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval(); log.debug("Scheduling retry for BatchedModifications commit {} in {}", @@ -119,7 +121,7 @@ class EntityOwnershipShardCommitCoordinator { COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender()); } - void inflightCommitSucceeded(EntityOwnershipShard shard) { + void inflightCommitSucceeded(final EntityOwnershipShard shard) { // Shouldn't be null but verify anyway if (inflightCommit == null) { return; @@ -135,7 +137,7 @@ class EntityOwnershipShardCommitCoordinator { commitNextBatch(shard); } - void commitNextBatch(EntityOwnershipShard shard) { + void commitNextBatch(final EntityOwnershipShard shard) { if (inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) { return; } @@ -157,14 +159,12 @@ class EntityOwnershipShardCommitCoordinator { shard.tryCommitModifications(inflightCommit); } - void commitModification(Modification modification, EntityOwnershipShard shard) { - BatchedModifications modifications = newBatchedModifications(); - modifications.addModification(modification); - commitModifications(modifications, shard); + void commitModification(final Modification modification, final EntityOwnershipShard shard) { + commitModifications(ImmutableList.of(modification), shard); } - void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) { - if (modifications.getModifications().isEmpty()) { + void commitModifications(final List modifications, final EntityOwnershipShard shard) { + if (modifications.isEmpty()) { return; } @@ -175,14 +175,15 @@ class EntityOwnershipShardCommitCoordinator { inflightCommit != null ? "A commit is inflight" : "No shard leader"); } - pendingModifications.addAll(modifications.getModifications()); + pendingModifications.addAll(modifications); } else { - inflightCommit = modifications; + inflightCommit = newBatchedModifications(); + inflightCommit.addModifications(modifications); shard.tryCommitModifications(inflightCommit); } } - void onStateChanged(EntityOwnershipShard shard, boolean isLeader) { + void onStateChanged(final EntityOwnershipShard shard, final boolean isLeader) { shard.possiblyRemoveAllInitialCandidates(shard.getLeader()); possiblyPrunePendingCommits(shard, isLeader); @@ -203,7 +204,7 @@ class EntityOwnershipShardCommitCoordinator { } } - private void possiblyPrunePendingCommits(EntityOwnershipShard shard, boolean isLeader) { + private void possiblyPrunePendingCommits(final EntityOwnershipShard shard, final boolean isLeader) { // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader. // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not @@ -221,22 +222,17 @@ class EntityOwnershipShardCommitCoordinator { } // Prune the subsequent pending modifications. - Iterator iter = pendingModifications.iterator(); - while (iter.hasNext()) { - Modification mod = iter.next(); - if (!canForwardModificationToNewLeader(mod)) { - iter.remove(); - } - } + pendingModifications.removeIf(mod -> !canForwardModificationToNewLeader(mod)); } } - @Nullable - private BatchedModifications pruneModifications(BatchedModifications toPrune) { + private @Nullable BatchedModifications pruneModifications(final BatchedModifications toPrune) { BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(), toPrune.getVersion()); prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady()); - prunedModifications.setReady(toPrune.isReady()); + if (toPrune.isReady()) { + prunedModifications.setReady(toPrune.getParticipatingShardNames()); + } prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent()); for (Modification mod: toPrune.getModifications()) { if (canForwardModificationToNewLeader(mod)) { @@ -247,7 +243,7 @@ class EntityOwnershipShardCommitCoordinator { return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null; } - private boolean canForwardModificationToNewLeader(Modification mod) { + private boolean canForwardModificationToNewLeader(final Modification mod) { // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used // to determine the new owner might be stale. if (mod instanceof WriteModification) { @@ -270,11 +266,11 @@ class EntityOwnershipShardCommitCoordinator { inflightCommit = newBatchedModifications; } - BatchedModifications newBatchedModifications() { + private BatchedModifications newBatchedModifications() { BatchedModifications modifications = new BatchedModifications( new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION); modifications.setDoCommitOnReady(true); - modifications.setReady(true); + modifications.setReady(); modifications.setTotalMessagesSent(1); return modifications; }