X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendReadWriteTransaction.java;h=45edfc9333d9530b347e735756e48fc81c678b08;hp=ced91b5acd0cd7b5449a90f70f266c92379b4bc8;hb=d9e3fa9b7d4f1ad484931151c3b3ef237a674ab2;hpb=073066012e12969cf901accf9b5fbf4999a18934 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index ced91b5acd..45edfc9333 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -14,6 +14,7 @@ import com.google.common.util.concurrent.FutureCallback; import java.util.Collection; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; @@ -83,7 +84,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { // Sequence has already been checked @Override - @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, + @Nullable TransactionSuccess doHandleRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (request instanceof ModifyTransactionRequest) { return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); @@ -101,7 +102,10 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionAbortRequest) { - handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + handleTransactionAbort(request.getSequence(), envelope, now); + return null; + } else if (request instanceof AbortLocalTransactionRequest) { + handleLocalTransactionAbort(request.getSequence(), envelope, now); return null; } else { LOG.warn("Rejecting unsupported request {}", request); @@ -142,11 +146,17 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void handleTransactionAbort(final TransactionAbortRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { + private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) { + Preconditions.checkState(readyCohort == null, "Transaction {} encountered local abort with commit underway", + getIdentifier()); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + sequence))); + } + + private void handleTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) { if (readyCohort == null) { - openTransaction.abort(() -> recordAndSendSuccess(envelope, now, - new TransactionAbortSuccess(getIdentifier(), request.getSequence()))); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + sequence))); return; } @@ -154,8 +164,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { @Override public void onSuccess(final Void result) { readyCohort = null; - recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), - request.getSequence())); + recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence)); LOG.debug("Transaction {} aborted", getIdentifier()); } @@ -199,7 +208,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { @@ -214,7 +223,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.commit(new FutureCallback() { @Override public void onSuccess(final UnsignedLong result) { @@ -229,7 +238,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void successfulCommit(final RequestEnvelope envelope, final long startTime) { + void successfulCommit(final RequestEnvelope envelope, final long startTime) { recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), envelope.getMessage().getSequence())); readyCohort = null; @@ -288,7 +297,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } else if (m instanceof TransactionMerge) { modification.merge(m.getPath(), ((TransactionMerge) m).getData()); } else { - LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m); } } } @@ -315,7 +324,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { coordinatedCommit(envelope, now); return null; default: - LOG.warn("{}: rejecting unsupported protocol {}", history().persistenceId(), maybeProto.get()); + LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get()); throw new UnsupportedRequestException(request); } } @@ -325,7 +334,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { // only once. if (readyCohort == null) { readyCohort = openTransaction.ready(); - LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier()); + LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier()); openTransaction = null; } }