X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContext.java;h=27969b3e8ef405331c1e69fb1b9f912612d22538;hp=941c86116cb7926ebd13a6b55b79b32484bdd54e;hb=123afd8b015173c459f4937c84eb2e91286f65a8;hpb=615c470922b05ad9471f26ab5804df0d486c8828 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 941c86116c..27969b3e8e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -50,8 +50,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { */ private volatile Throwable failedModification; - protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor, - ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) { + protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, + final ActorContext actorContext, final short remoteTransactionVersion, final OperationLimiter limiter) { super(identifier, remoteTransactionVersion); this.limiter = Preconditions.checkNotNull(limiter); this.actor = actor; @@ -75,27 +75,34 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public Future directCommit() { + public Future directCommit(final Boolean havePermit) { LOG.debug("Tx {} directCommit called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. - + bumpPermits(havePermit); return sendBatchedModifications(true, true); } @Override - public Future readyTransaction() { + public Future readyTransaction(final Boolean havePermit) { logModificationCount(); LOG.debug("Tx {} readyTransaction called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. + bumpPermits(havePermit); Future lastModificationsFuture = sendBatchedModifications(true, false); return transformReadyReply(lastModificationsFuture); } + private void bumpPermits(final Boolean havePermit) { + if (Boolean.TRUE.equals(havePermit)) { + ++batchPermits; + } + } + protected Future transformReadyReply(final Future readyReplyFuture) { // Transform the last reply Future into a Future that returns the cohort actor path from // the last reply message. That's the end result of the ready operation. @@ -107,7 +114,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { return new BatchedModifications(getIdentifier(), getTransactionVersion()); } - private void batchModification(Modification modification, boolean havePermit) { + private void batchModification(final Modification modification, final boolean havePermit) { incrementModificationCount(); if (havePermit) { ++batchPermits; @@ -129,7 +136,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { return sendBatchedModifications(false, false); } - protected Future sendBatchedModifications(boolean ready, boolean doCommitOnReady) { + protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) { Future sent = null; if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) { if (batchedModifications == null) { @@ -167,7 +174,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { actorContext.getTransactionCommitOperationTimeout()); sent.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object success) { + public void onComplete(final Throwable failure, final Object success) { if (failure != null) { LOG.debug("Tx {} modifications failed", getIdentifier(), failure); failedModification = failure; @@ -183,16 +190,23 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public void executeModification(AbstractModification modification) { + public void executeModification(final AbstractModification modification, final Boolean havePermit) { LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass().getSimpleName(), modification.getPath()); - final boolean havePermit = failedModification == null && acquireOperation(); - batchModification(modification, havePermit); + final boolean permitToRelease; + if (havePermit == null) { + permitToRelease = failedModification == null && acquireOperation(); + } else { + permitToRelease = havePermit.booleanValue(); + } + + batchModification(modification, permitToRelease); } @Override - public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture) { + public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, + final Boolean havePermit) { LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); @@ -209,14 +223,14 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - final boolean havePermit = acquireOperation(); + final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue(); sendBatchedModifications(); OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { // We have previously acquired an operation, now release it, no matter what happened - if (havePermit) { + if (permitToRelease) { limiter.release(); } @@ -245,15 +259,15 @@ public class RemoteTransactionContext extends AbstractTransactionContext { * @return True if a permit was successfully acquired, false otherwise */ private boolean acquireOperation() { - if (isOperationHandOffComplete()) { - if (limiter.acquire()) { - return true; - } + Preconditions.checkState(isOperationHandOffComplete(), + "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff", + getIdentifier(), actor); - LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), - actor); + if (limiter.acquire()) { + return true; } + LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor); return false; }