X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContext.java;h=ba1d05068b0ffc23b68097e8e4109baca1fbd9d6;hp=a25ddc873327accdc31995b160409025b19ce337;hb=5b69c8e66b12a29a36457955cac4a45affd7c73f;hpb=daaef05cbf70e6cbec9af181258faead6d9620a6 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index a25ddc8733..ba1d05068b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -8,24 +8,26 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; -import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import java.util.Optional; +import java.util.SortedSet; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; -import org.opendaylight.controller.cluster.datastore.messages.DataExists; -import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -41,46 +43,36 @@ import scala.concurrent.Future; public class RemoteTransactionContext extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class); - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final ActorSelection actor; - private final boolean isTxActorLocal; - private final short remoteTransactionVersion; + private final OperationLimiter limiter; - private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; private int totalBatchedModificationsSent; - - protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier, - ActorContext actorContext, boolean isTxActorLocal, - short remoteTransactionVersion, OperationCompleter operationCompleter) { - super(identifier); + private int batchPermits; + + /** + * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend + * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them + * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the + * message to resynchronize with the backend, sharing a 'lost message' failure path. + */ + private volatile Throwable failedModification; + + protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, + final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) { + super(identifier, remoteTransactionVersion); + this.limiter = requireNonNull(limiter); this.actor = actor; - this.actorContext = actorContext; - this.isTxActorLocal = isTxActorLocal; - this.remoteTransactionVersion = remoteTransactionVersion; - this.operationCompleter = operationCompleter; + this.actorUtils = actorUtils; } - private Future completeOperation(Future operationFuture){ - operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher()); - return operationFuture; - } - - private ActorSelection getActor() { return actor; } - protected ActorContext getActorContext() { - return actorContext; - } - - protected short getRemoteTransactionVersion() { - return remoteTransactionVersion; - } - - protected Future executeOperationAsync(SerializableMessage msg) { - return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable())); + protected ActorUtils getActorUtils() { + return actorUtils; } @Override @@ -88,189 +80,227 @@ public class RemoteTransactionContext extends AbstractTransactionContext { LOG.debug("Tx {} closeTransaction called", getIdentifier()); TransactionContextCleanup.untrack(this); - actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable()); + actorUtils.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable()); } @Override - public boolean supportsDirectCommit() { - return true; - } - - @Override - public Future directCommit() { + public Future directCommit(final Boolean havePermit) { LOG.debug("Tx {} directCommit called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. - - return sendBatchedModifications(true, true); + bumpPermits(havePermit); + return sendBatchedModifications(true, true, Optional.empty()); } @Override - public Future readyTransaction() { + public Future readyTransaction(final Boolean havePermit, + final Optional> participatingShardNames) { + logModificationCount(); + LOG.debug("Tx {} readyTransaction called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. - Future lastModificationsFuture = sendBatchedModifications(true, false); + bumpPermits(havePermit); + Future lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames); return transformReadyReply(lastModificationsFuture); } + private void bumpPermits(final Boolean havePermit) { + if (Boolean.TRUE.equals(havePermit)) { + ++batchPermits; + } + } + protected Future transformReadyReply(final Future readyReplyFuture) { // Transform the last reply Future into a Future that returns the cohort actor path from // the last reply message. That's the end result of the ready operation. - return TransactionReadyReplyMapper.transform(readyReplyFuture, actorContext, getIdentifier()); + return TransactionReadyReplyMapper.transform(readyReplyFuture, actorUtils, getIdentifier()); } private BatchedModifications newBatchedModifications() { - return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, getIdentifier().getChainId()); + return new BatchedModifications(getIdentifier(), getTransactionVersion()); } - private void batchModification(Modification modification) { - if(batchedModifications == null) { + private void batchModification(final Modification modification, final boolean havePermit) { + incrementModificationCount(); + if (havePermit) { + ++batchPermits; + } + + if (batchedModifications == null) { batchedModifications = newBatchedModifications(); } batchedModifications.addModification(modification); - if(batchedModifications.getModifications().size() >= - actorContext.getDatastoreContext().getShardBatchedModificationCount()) { + if (batchedModifications.getModifications().size() + >= actorUtils.getDatastoreContext().getShardBatchedModificationCount()) { sendBatchedModifications(); } } protected Future sendBatchedModifications() { - return sendBatchedModifications(false, false); + return sendBatchedModifications(false, false, Optional.empty()); } - protected Future sendBatchedModifications(boolean ready, boolean doCommitOnReady) { + protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, + final Optional> participatingShardNames) { Future sent = null; - if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) { - if(batchedModifications == null) { + if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) { + if (batchedModifications == null) { batchedModifications = newBatchedModifications(); } - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(), - batchedModifications.getModifications().size(), ready); - } + LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(), + batchedModifications.getModifications().size(), ready); - batchedModifications.setReady(ready); batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); - sent = executeOperationAsync(batchedModifications); - if(ready) { + final BatchedModifications toSend = batchedModifications; + final int permitsToRelease = batchPermits; + batchPermits = 0; + + if (ready) { + batchedModifications.setReady(participatingShardNames); + batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications = null; } else { batchedModifications = newBatchedModifications(); + + final Throwable failure = failedModification; + if (failure != null) { + // We have observed a modification failure, it does not make sense to send this batch. This speeds + // up the time when the application could be blocked due to messages timing out and operation + // limiter kicking in. + LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier()); + limiter.release(permitsToRelease); + return Futures.failed(failure); + } } + + sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(), + actorUtils.getTransactionCommitOperationTimeout()); + sent.onComplete(new OnComplete<>() { + @Override + public void onComplete(final Throwable failure, final Object success) { + if (failure != null) { + LOG.debug("Tx {} modifications failed", getIdentifier(), failure); + failedModification = failure; + } else { + LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success); + } + limiter.release(permitsToRelease); + } + }, actorUtils.getClientDispatcher()); } return sent; } @Override - public void deleteData(YangInstanceIdentifier path) { - LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path); - - batchModification(new DeleteModification(path)); + public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { + LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path); + executeModification(new DeleteModification(path), havePermit); } @Override - public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path); - - batchModification(new MergeModification(path, data)); + public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, + final Boolean havePermit) { + LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path); + executeModification(new MergeModification(path, data), havePermit); } @Override - public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path); + public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, + final Boolean havePermit) { + LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path); + executeModification(new WriteModification(path, data), havePermit); + } + + private void executeModification(final AbstractModification modification, final Boolean havePermit) { + final boolean permitToRelease; + if (havePermit == null) { + permitToRelease = failedModification == null && acquireOperation(); + } else { + permitToRelease = havePermit.booleanValue(); + } - batchModification(new WriteModification(path, data)); + batchModification(modification, permitToRelease); } @Override - public void readData(final YangInstanceIdentifier path, - final SettableFuture>> returnFuture ) { - - LOG.debug("Tx {} readData called path = {}", getIdentifier(), path); + public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, + final Boolean havePermit) { + LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), + readCmd.getPath()); + + final Throwable failure = failedModification; + if (failure != null) { + // If we know there was a previous modification failure, we must not send a read request, as it risks + // returning incorrect data. We check this before acquiring an operation simply because we want the app + // to complete this transaction as soon as possible. + returnFuture.setException(new ReadFailedException("Previous modification failed, cannot " + + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure)); + return; + } // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. + final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue(); sendBatchedModifications(); - OnComplete onComplete = new OnComplete() { + OnComplete onComplete = new OnComplete<>() { @Override - public void onComplete(Throwable failure, Object readResponse) throws Throwable { - if(failure != null) { - LOG.debug("Tx {} read operation failed: {}", getIdentifier(), failure); - returnFuture.setException(new ReadFailedException( - "Error reading data for path " + path, failure)); - - } else { - LOG.debug("Tx {} read operation succeeded", getIdentifier(), failure); - - if (readResponse instanceof ReadDataReply) { - ReadDataReply reply = (ReadDataReply) readResponse; - returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); + public void onComplete(final Throwable failure, final Object response) { + // We have previously acquired an operation, now release it, no matter what happened + if (permitToRelease) { + limiter.release(); + } - } else if (ReadDataReply.isSerializedType(readResponse)) { - ReadDataReply reply = ReadDataReply.fromSerializable(readResponse); - returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); + if (failure != null) { + LOG.debug("Tx {} {} operation failed", getIdentifier(), readCmd.getClass().getSimpleName(), + failure); - } else { - returnFuture.setException(new ReadFailedException( - "Invalid response reading data for path " + path)); - } + returnFuture.setException(new ReadFailedException("Error checking " + + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure)); + } else { + LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName()); + readCmd.processResponse(response, returnFuture); } } }; - Future readFuture = executeOperationAsync(new ReadData(path)); - - readFuture.onComplete(onComplete, actorContext.getClientDispatcher()); + final Future future = actorUtils.executeOperationAsync(getActor(), + readCmd.asVersion(getTransactionVersion()).toSerializable(), actorUtils.getOperationTimeout()); + future.onComplete(onComplete, actorUtils.getClientDispatcher()); } - @Override - public void dataExists(final YangInstanceIdentifier path, final SettableFuture returnFuture) { - - LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path); - - // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the - // public API contract. - - sendBatchedModifications(); - - OnComplete onComplete = new OnComplete() { - @Override - public void onComplete(Throwable failure, Object response) throws Throwable { - if(failure != null) { - LOG.debug("Tx {} dataExists operation failed: {}", getIdentifier(), failure); - returnFuture.setException(new ReadFailedException( - "Error checking data exists for path " + path, failure)); - } else { - LOG.debug("Tx {} dataExists operation succeeded", getIdentifier(), failure); - - if (response instanceof DataExistsReply) { - returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists())); - - } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { - returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists())); - - } else { - returnFuture.setException(new ReadFailedException( - "Invalid response checking exists for path " + path)); - } - } - } - }; + /** + * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method + * does nothing. + * + * @return True if a permit was successfully acquired, false otherwise + */ + private boolean acquireOperation() { + checkState(isOperationHandOffComplete(), + "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff", + getIdentifier(), actor); + + if (limiter.acquire()) { + return true; + } - Future future = executeOperationAsync(new DataExists(path)); + LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor); + return false; + } - future.onComplete(onComplete, actorContext.getClientDispatcher()); + @Override + public boolean usesOperationLimiting() { + return true; } }