X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardCommitCoordinator.java;h=a6958297443e9cb7e635f300a96c1b3346e715e6;hb=a3cecfd01d0ef8922530924e3ee9684eb03ee5d6;hp=6c15ef6ed05cb2fb41b7b837e3655b0fef167beb;hpb=81cc10db365aa8cde38a3d2777488bb83bd69ef5;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index 6c15ef6ed0..a695829744 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -7,18 +7,28 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; + import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Status.Failure; +import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -28,29 +38,37 @@ import scala.concurrent.duration.FiniteDuration; * @author Thomas Pantelis */ class EntityOwnershipShardCommitCoordinator { - private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry"; + private static final Object COMMIT_RETRY_MESSAGE = new Object() { + @Override + public String toString() { + return "entityCommitRetry"; + } + }; + private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal"); - private final Logger log; - private int transactionIDCounter = 0; - private final String localMemberName; private final Queue pendingModifications = new LinkedList<>(); + private final LocalHistoryIdentifier historyId; + private final Logger log; + private BatchedModifications inflightCommit; private Cancellable retryCommitSchedule; + private long transactionIDCounter = 0; - EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) { - this.localMemberName = localMemberName; - this.log = log; + EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) { + this.log = Preconditions.checkNotNull(log); + historyId = new LocalHistoryIdentifier( + ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0); } boolean handleMessage(Object message, EntityOwnershipShard shard) { boolean handled = true; - if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) { + if (CommitTransactionReply.isSerializedType(message)) { // Successful reply from a local commit. inflightCommitSucceeded(shard); - } else if(message instanceof akka.actor.Status.Failure) { + } else if (message instanceof akka.actor.Status.Failure) { // Failure reply from a local commit. - inflightCommitFailure(((Failure)message).cause(), shard); - } else if(message.equals(COMMIT_RETRY_MESSAGE)) { + inflightCommitFailure(((Failure) message).cause(), shard); + } else if (COMMIT_RETRY_MESSAGE.equals(message)) { retryInflightCommit(shard); } else { handled = false; @@ -61,12 +79,12 @@ class EntityOwnershipShardCommitCoordinator { private void retryInflightCommit(EntityOwnershipShard shard) { // Shouldn't be null happen but verify anyway - if(inflightCommit == null) { + if (inflightCommit == null) { return; } - if(shard.hasLeader()) { - log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID()); + if (shard.hasLeader()) { + log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionId()); shard.tryCommitModifications(inflightCommit); } else { @@ -76,13 +94,13 @@ class EntityOwnershipShardCommitCoordinator { void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) { // This should've originated from a failed inflight commit but verify anyway - if(inflightCommit == null) { + if (inflightCommit == null) { return; } - log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause); + log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionId(), cause); - if(!(cause instanceof NoShardLeaderException)) { + if (!(cause instanceof NoShardLeaderException)) { // If the failure is other than NoShardLeaderException the commit may have been partially // processed so retry with a new transaction ID to be safe. newInflightCommitWithDifferentTransactionID(); @@ -95,7 +113,7 @@ class EntityOwnershipShardCommitCoordinator { FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval(); log.debug("Scheduling retry for BatchedModifications commit {} in {}", - inflightCommit.getTransactionID(), duration); + inflightCommit.getTransactionId(), duration); retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(), COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender()); @@ -103,66 +121,78 @@ class EntityOwnershipShardCommitCoordinator { void inflightCommitSucceeded(EntityOwnershipShard shard) { // Shouldn't be null but verify anyway - if(inflightCommit == null) { + if (inflightCommit == null) { return; } - if(retryCommitSchedule != null) { + if (retryCommitSchedule != null) { retryCommitSchedule.cancel(); } - log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID()); + log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionId()); inflightCommit = null; commitNextBatch(shard); } void commitNextBatch(EntityOwnershipShard shard) { - if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) { + if (inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) { return; } inflightCommit = newBatchedModifications(); Iterator iter = pendingModifications.iterator(); - while(iter.hasNext()) { + while (iter.hasNext()) { inflightCommit.addModification(iter.next()); iter.remove(); - if(inflightCommit.getModifications().size() >= - shard.getDatastoreContext().getShardBatchedModificationCount()) { + if (inflightCommit.getModifications().size() + >= shard.getDatastoreContext().getShardBatchedModificationCount()) { break; } } - log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(), + log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionId(), inflightCommit.getModifications().size()); shard.tryCommitModifications(inflightCommit); } void commitModification(Modification modification, EntityOwnershipShard shard) { + BatchedModifications modifications = newBatchedModifications(); + modifications.addModification(modification); + commitModifications(modifications, shard); + } + + void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) { + if (modifications.getModifications().isEmpty()) { + return; + } + boolean hasLeader = shard.hasLeader(); - if(inflightCommit != null || !hasLeader) { - if(log.isDebugEnabled()) { - log.debug("{} - adding modification to pending", - (inflightCommit != null ? "A commit is inflight" : "No shard leader")); + if (inflightCommit != null || !hasLeader) { + if (log.isDebugEnabled()) { + log.debug("{} - adding modifications to pending", + inflightCommit != null ? "A commit is inflight" : "No shard leader"); } - pendingModifications.add(modification); + pendingModifications.addAll(modifications.getModifications()); } else { - inflightCommit = newBatchedModifications(); - inflightCommit.addModification(modification); - + inflightCommit = modifications; shard.tryCommitModifications(inflightCommit); } } void onStateChanged(EntityOwnershipShard shard, boolean isLeader) { - if(!isLeader && inflightCommit != null) { + shard.possiblyRemoveAllInitialCandidates(shard.getLeader()); + + possiblyPrunePendingCommits(shard, isLeader); + + if (!isLeader && inflightCommit != null) { // We're no longer the leader but we have an inflight local commit. This likely means we didn't get // consensus for the commit and switched to follower due to another node with a higher term. We // can't be sure if the commit was replicated to any node so we retry it here with a new // transaction ID. - if(retryCommitSchedule != null) { + if (retryCommitSchedule != null) { retryCommitSchedule.cancel(); } @@ -173,16 +203,76 @@ class EntityOwnershipShardCommitCoordinator { } } + private void possiblyPrunePendingCommits(EntityOwnershipShard shard, boolean isLeader) { + // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader. + // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the + // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not + // forward such commits thus we prune the pending modifications. We still should forward local candidate change + // commits. + if (shard.hasLeader() && !isLeader) { + // We may have already submitted a transaction for replication and commit. We don't need the base Shard to + // forward it since we also have it stored in the inflightCommit and handle retries. So we just clear + // pending transactions and drop them. + shard.convertPendingTransactionsToMessages(); + + // Prune the inflightCommit. + if (inflightCommit != null) { + inflightCommit = pruneModifications(inflightCommit); + } + + // Prune the subsequent pending modifications. + Iterator iter = pendingModifications.iterator(); + while (iter.hasNext()) { + Modification mod = iter.next(); + if (!canForwardModificationToNewLeader(mod)) { + iter.remove(); + } + } + } + } + + @Nullable + private BatchedModifications pruneModifications(BatchedModifications toPrune) { + BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(), + toPrune.getVersion()); + prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady()); + prunedModifications.setReady(toPrune.isReady()); + prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent()); + for (Modification mod: toPrune.getModifications()) { + if (canForwardModificationToNewLeader(mod)) { + prunedModifications.addModification(mod); + } + } + + return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null; + } + + private boolean canForwardModificationToNewLeader(Modification mod) { + // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used + // to determine the new owner might be stale. + if (mod instanceof WriteModification) { + WriteModification writeMod = (WriteModification)mod; + boolean canForward = !writeMod.getPath().getLastPathArgument().getNodeType().equals(ENTITY_OWNER_QNAME); + + if (!canForward) { + log.debug("Not forwarding WRITE modification for {} to new leader", writeMod.getPath()); + } + + return canForward; + } + + return true; + } + private void newInflightCommitWithDifferentTransactionID() { BatchedModifications newBatchedModifications = newBatchedModifications(); - newBatchedModifications.getModifications().addAll(inflightCommit.getModifications()); + newBatchedModifications.addModifications(inflightCommit.getModifications()); inflightCommit = newBatchedModifications; } - private BatchedModifications newBatchedModifications() { + BatchedModifications newBatchedModifications() { BatchedModifications modifications = new BatchedModifications( - TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(), - DataStoreVersions.CURRENT_VERSION, ""); + new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION); modifications.setDoCommitOnReady(true); modifications.setReady(true); modifications.setTotalMessagesSent(1);