X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContext.java;h=7c45c542bd1422887c5c96cd9f53ce3ce81e9268;hb=refs%2Fchanges%2F57%2F68757%2F6;hp=ba64e29d7542ec2027c4d6f1cbabd11d54bd5af0;hpb=61280eab802fa28338f8f264f958f97c1e16e3ca;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index ba64e29d75..7c45c542bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -9,15 +9,14 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; -import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; -import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -41,6 +40,15 @@ public class RemoteTransactionContext extends AbstractTransactionContext { private BatchedModifications batchedModifications; private int totalBatchedModificationsSent; + private int batchPermits; + + /** + * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend + * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them + * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the + * message to resynchronize with the backend, sharing a 'lost message' failure path. + */ + private volatile Throwable failedModification; protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor, ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) { @@ -50,11 +58,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { this.actorContext = actorContext; } - private Future completeOperation(Future operationFuture) { - operationFuture.onComplete(limiter, actorContext.getClientDispatcher()); - return operationFuture; - } - private ActorSelection getActor() { return actor; } @@ -63,10 +66,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { return actorContext; } - protected Future executeOperationAsync(SerializableMessage msg, Timeout timeout) { - return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout)); - } - @Override public void closeTransaction() { LOG.debug("Tx {} closeTransaction called", getIdentifier()); @@ -108,8 +107,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext { return new BatchedModifications(getIdentifier(), getTransactionVersion()); } - private void batchModification(Modification modification) { + private void batchModification(Modification modification, boolean havePermit) { incrementModificationCount(); + if (havePermit) { + ++batchPermits; + } + if (batchedModifications == null) { batchedModifications = newBatchedModifications(); } @@ -141,13 +144,39 @@ public class RemoteTransactionContext extends AbstractTransactionContext { batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); final BatchedModifications toSend = batchedModifications; + final int permitsToRelease = batchPermits; + batchPermits = 0; + if (ready) { batchedModifications = null; } else { batchedModifications = newBatchedModifications(); + + final Throwable failure = failedModification; + if (failure != null) { + // We have observed a modification failure, it does not make sense to send this batch. This speeds + // up the time when the application could be blocked due to messages timing out and operation + // limiter kicking in. + LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier()); + limiter.release(permitsToRelease); + return Futures.failed(failure); + } } - sent = executeOperationAsync(toSend, actorContext.getTransactionCommitOperationTimeout()); + sent = actorContext.executeOperationAsync(getActor(), toSend.toSerializable(), + actorContext.getTransactionCommitOperationTimeout()); + sent.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object success) { + if (failure != null) { + LOG.debug("Tx {} modifications failed", getIdentifier(), failure); + failedModification = failure; + } else { + LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success); + } + limiter.release(permitsToRelease); + } + }, actorContext.getClientDispatcher()); } return sent; @@ -158,8 +187,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass().getSimpleName(), modification.getPath()); - acquireOperation(); - batchModification(modification); + final boolean havePermit = failedModification == null && acquireOperation(); + batchModification(modification, havePermit); } @Override @@ -167,15 +196,30 @@ public class RemoteTransactionContext extends AbstractTransactionContext { LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); + final Throwable failure = failedModification; + if (failure != null) { + // If we know there was a previous modification failure, we must not send a read request, as it risks + // returning incorrect data. We check this before acquiring an operation simply because we want the app + // to complete this transaction as soon as possible. + returnFuture.setException(new ReadFailedException("Previous modification failed, cannot " + + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure)); + return; + } + // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - acquireOperation(); + final boolean havePermit = acquireOperation(); sendBatchedModifications(); OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) throws Throwable { + public void onComplete(Throwable failure, Object response) { + // We have previously acquired an operation, now release it, no matter what happened + if (havePermit) { + limiter.release(); + } + if (failure != null) { LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(), failure); @@ -189,20 +233,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } }; - Future future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()), - actorContext.getOperationTimeout()); - + final Future future = actorContext.executeOperationAsync(getActor(), + readCmd.asVersion(getTransactionVersion()).toSerializable(), actorContext.getOperationTimeout()); future.onComplete(onComplete, actorContext.getClientDispatcher()); } /** - * Acquire operation from the limiter if the hand-off has completed. If - * the hand-off is still ongoing, this method does nothing. + * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method + * does nothing. + * + * @return True if a permit was successfully acquired, false otherwise */ - private void acquireOperation() { - if (isOperationHandOffComplete()) { - limiter.acquire(); - } + private boolean acquireOperation() { + return isOperationHandOffComplete() && limiter.acquire(); } @Override