X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendReadWriteTransaction.java;h=50e913025d6b9a7ab818393a47c61d4a384f202a;hb=refs%2Fchanges%2F36%2F60436%2F1;hp=6cacb325a03fc441f99496d07552cd58cfe867df;hpb=5e60027f1ed6b966608d8020df47c6ec8fe16716;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 6cacb325a0..50e913025d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -122,6 +122,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + /** + * Retired state, needed to catch and suppress callbacks after we have removed associated state. + */ + private static final class Retired extends State { + private final String prevStateString; + + Retired(final State prevState) { + prevStateString = prevState.toString(); + } + + @Override + public String toString() { + return "RETIRED (in " + prevStateString + ")"; + } + } + private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class); private static final State ABORTED = new State() { @Override @@ -196,6 +212,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + @Override + void retire() { + state = new Retired(state); + } + private void handleTransactionPreCommit(final TransactionPreCommitRequest request, final RequestEnvelope envelope, final long now) throws RequestException { throwIfFailed(); @@ -211,10 +232,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { ready.readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { - LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); - recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(), - request.getSequence())); - ready.stage = CommitStage.PRE_COMMIT_COMPLETE; + successfulPreCommit(envelope, now); } @Override @@ -233,7 +251,26 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } - private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + void successfulPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; + } + + final Ready ready = checkReady(); + LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); + recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.PRE_COMMIT_COMPLETE; + } + + void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause); + return; + } + recordAndSendFailure(envelope, now, cause); state = new Failed(cause); LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause); @@ -343,10 +380,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { checkReady().readyCohort.canCommit(new FutureCallback() { @Override public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(), - envelope.getMessage().getSequence())); - ready.stage = CommitStage.CAN_COMMIT_COMPLETE; - LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); + successfulCanCommit(envelope, now); } @Override @@ -365,6 +399,20 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + void successfulCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; + } + + final Ready ready = checkReady(); + recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.CAN_COMMIT_COMPLETE; + LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); + } + private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException { throwIfFailed(); @@ -399,6 +447,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); ready.stage = CommitStage.PRE_COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier()); @@ -416,6 +469,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); ready.stage = CommitStage.COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier()); @@ -433,6 +491,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier()); + return; + } + recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(), envelope.getMessage().getSequence())); state = COMMITTED;