X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardCommitCoordinator.java;h=c792cf1dda996370b52002fa952b5689088d73df;hb=07c96b0fa318b7bf559df4954f705d06a44f1354;hp=97e6c62037b2801e1ea1e6521cc8b77597be3335;hpb=4d1709660b7af992d4c382a2a38debb5c7d64fb9;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index 97e6c62037..c792cf1dda 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; + import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Status.Failure; @@ -14,6 +16,7 @@ import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; @@ -25,6 +28,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -168,7 +172,7 @@ class EntityOwnershipShardCommitCoordinator { if(inflightCommit != null || !hasLeader) { if(log.isDebugEnabled()) { log.debug("{} - adding modifications to pending", - (inflightCommit != null ? "A commit is inflight" : "No shard leader")); + inflightCommit != null ? "A commit is inflight" : "No shard leader"); } pendingModifications.addAll(modifications.getModifications()); @@ -179,6 +183,10 @@ class EntityOwnershipShardCommitCoordinator { } void onStateChanged(EntityOwnershipShard shard, boolean isLeader) { + shard.possiblyRemoveAllInitialCandidates(shard.getLeader()); + + possiblyPrunePendingCommits(shard, isLeader); + if(!isLeader && inflightCommit != null) { // We're no longer the leader but we have an inflight local commit. This likely means we didn't get // consensus for the commit and switched to follower due to another node with a higher term. We @@ -195,6 +203,66 @@ class EntityOwnershipShardCommitCoordinator { } } + private void possiblyPrunePendingCommits(EntityOwnershipShard shard, boolean isLeader) { + // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader. + // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the + // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not + // forward such commits thus we prune the pending modifications. We still should forward local candidate change + // commits. + if (shard.hasLeader() && !isLeader) { + // We may have already submitted a transaction for replication and commit. We don't need the base Shard to + // forward it since we also have it stored in the inflightCommit and handle retries. So we just clear + // pending transactions and drop them. + shard.convertPendingTransactionsToMessages(); + + // Prune the inflightCommit. + if(inflightCommit != null) { + inflightCommit = pruneModifications(inflightCommit); + } + + // Prune the subsequent pending modifications. + Iterator iter = pendingModifications.iterator(); + while(iter.hasNext()) { + Modification mod = iter.next(); + if(!canForwardModificationToNewLeader(mod)) { + iter.remove(); + } + } + } + } + + @Nullable + private BatchedModifications pruneModifications(BatchedModifications toPrune) { + BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionID(), toPrune.getVersion()); + prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady()); + prunedModifications.setReady(toPrune.isReady()); + prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent()); + for(Modification mod: toPrune.getModifications()) { + if(canForwardModificationToNewLeader(mod)) { + prunedModifications.addModification(mod); + } + } + + return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null; + } + + private boolean canForwardModificationToNewLeader(Modification mod) { + // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used + // to determine the new owner might be stale. + if (mod instanceof WriteModification) { + WriteModification writeMod = (WriteModification)mod; + boolean canForward = !writeMod.getPath().getLastPathArgument().getNodeType().equals(ENTITY_OWNER_QNAME); + + if (!canForward) { + log.debug("Not forwarding WRITE modification for {} to new leader", writeMod.getPath()); + } + + return canForward; + } + + return true; + } + private void newInflightCommitWithDifferentTransactionID() { BatchedModifications newBatchedModifications = newBatchedModifications(); newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());