X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendReadWriteTransaction.java;h=45edfc9333d9530b347e735756e48fc81c678b08;hp=f9d9580214e41c2d28430c3a46c68b005ad6e8bc;hb=d9e3fa9b7d4f1ad484931151c3b3ef237a674ab2;hpb=17a38939f6ba3cbbc1ff0f1f3e00b58f5002813d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index f9d9580214..45edfc9333 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -14,6 +14,7 @@ import com.google.common.util.concurrent.FutureCallback; import java.util.Collection; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; @@ -83,7 +84,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { // Sequence has already been checked @Override - @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, + @Nullable TransactionSuccess doHandleRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (request instanceof ModifyTransactionRequest) { return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); @@ -101,9 +102,13 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionAbortRequest) { - handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + handleTransactionAbort(request.getSequence(), envelope, now); + return null; + } else if (request instanceof AbortLocalTransactionRequest) { + handleLocalTransactionAbort(request.getSequence(), envelope, now); return null; } else { + LOG.warn("Rejecting unsupported request {}", request); throw new UnsupportedRequestException(request); } } @@ -141,11 +146,17 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void handleTransactionAbort(final TransactionAbortRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { + private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) { + Preconditions.checkState(readyCohort == null, "Transaction {} encountered local abort with commit underway", + getIdentifier()); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + sequence))); + } + + private void handleTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) { if (readyCohort == null) { - openTransaction.abort(() -> recordAndSendSuccess(envelope, now, - new TransactionAbortSuccess(getIdentifier(), request.getSequence()))); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + sequence))); return; } @@ -153,8 +164,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { @Override public void onSuccess(final Void result) { readyCohort = null; - recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), - request.getSequence())); + recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence)); LOG.debug("Transaction {} aborted", getIdentifier()); } @@ -198,7 +208,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { @@ -213,9 +223,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.commit(new FutureCallback() { - @Override public void onSuccess(final UnsignedLong result) { successfulCommit(envelope, startTime); @@ -229,7 +238,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void successfulCommit(final RequestEnvelope envelope, final long startTime) { + void successfulCommit(final RequestEnvelope envelope, final long startTime) { recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), envelope.getMessage().getSequence())); readyCohort = null; @@ -237,16 +246,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - if (sealedModification.equals(request.getModification())) { + if (!sealedModification.equals(request.getModification())) { + LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification()); + throw new UnsupportedRequestException(request); + } + + final java.util.Optional optFailure = request.getDelayedFailure(); + if (optFailure.isPresent()) { + readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()); + } else { readyCohort = history().createReadyCohort(getIdentifier(), sealedModification); + } - if (request.isCoordinated()) { - coordinatedCommit(envelope, now); - } else { - directCommit(envelope, now); - } + if (request.isCoordinated()) { + coordinatedCommit(envelope, now); } else { - throw new UnsupportedRequestException(request); + directCommit(envelope, now); } } @@ -282,7 +297,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } else if (m instanceof TransactionMerge) { modification.merge(m.getPath(), ((TransactionMerge) m).getData()); } else { - LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m); } } } @@ -309,6 +324,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { coordinatedCommit(envelope, now); return null; default: + LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get()); throw new UnsupportedRequestException(request); } } @@ -318,7 +334,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { // only once. if (readyCohort == null) { readyCohort = openTransaction.ready(); - LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier()); + LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier()); openTransaction = null; } }