X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendReadWriteTransaction.java;h=dfe063224492b4a290218522136981816fc423aa;hb=2dedb8231e13abe55d6b75eb532d23dbe536e168;hp=6cacb325a03fc441f99496d07552cd58cfe867df;hpb=5e60027f1ed6b966608d8020df47c6ec8fe16716;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 6cacb325a0..dfe0632244 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -7,13 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; import java.util.Collection; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; +import java.util.Optional; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; @@ -41,18 +42,18 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Frontend read-write transaction state as observed by the shard leader. + * Frontend read-write transaction state as observed by the shard leader. This class is NOT thread-safe. * * @author Robert Varga */ -@NotThreadSafe final class FrontendReadWriteTransaction extends FrontendTransaction { private enum CommitStage { READY, @@ -72,7 +73,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { final RequestException cause; Failed(final RequestException cause) { - this.cause = Preconditions.checkNotNull(cause); + this.cause = requireNonNull(cause); } @Override @@ -85,7 +86,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { final ReadWriteShardDataTreeTransaction openTransaction; Open(final ReadWriteShardDataTreeTransaction openTransaction) { - this.openTransaction = Preconditions.checkNotNull(openTransaction); + this.openTransaction = requireNonNull(openTransaction); } @Override @@ -99,8 +100,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { CommitStage stage; Ready(final ShardDataTreeCohort readyCohort) { - this.readyCohort = Preconditions.checkNotNull(readyCohort); - this.stage = CommitStage.READY; + this.readyCohort = requireNonNull(readyCohort); + stage = CommitStage.READY; } @Override @@ -113,7 +114,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { final DataTreeModification sealedModification; Sealed(final DataTreeModification sealedModification) { - this.sealedModification = Preconditions.checkNotNull(sealedModification); + this.sealedModification = requireNonNull(sealedModification); } @Override @@ -122,6 +123,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + /** + * Retired state, needed to catch and suppress callbacks after we have removed associated state. + */ + private static final class Retired extends State { + private final String prevStateString; + + Retired(final State prevState) { + prevStateString = prevState.toString(); + } + + @Override + public String toString() { + return "RETIRED (in " + prevStateString + ")"; + } + } + private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class); private static final State ABORTED = new State() { @Override @@ -147,13 +164,13 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, final ReadWriteShardDataTreeTransaction transaction) { super(history, id); - this.state = new Open(transaction); + state = new Open(transaction); } private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, final DataTreeModification mod) { super(history, id); - this.state = new Sealed(mod); + state = new Sealed(mod); } static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history, @@ -168,7 +185,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { // Sequence has already been checked @Override - @Nullable TransactionSuccess doHandleRequest(final TransactionRequest request, final RequestEnvelope envelope, + TransactionSuccess doHandleRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (request instanceof ModifyTransactionRequest) { return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); @@ -196,6 +213,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + @Override + void retire() { + state = new Retired(state); + } + private void handleTransactionPreCommit(final TransactionPreCommitRequest request, final RequestEnvelope envelope, final long now) throws RequestException { throwIfFailed(); @@ -211,10 +233,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { ready.readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { - LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); - recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(), - request.getSequence())); - ready.stage = CommitStage.PRE_COMMIT_COMPLETE; + successfulPreCommit(envelope, now); } @Override @@ -229,11 +248,30 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { case READY: throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage); default: - throw new IllegalStateException("Unhandled commit stage " + ready.stage); + throwUnhandledCommitStage(ready); + } + } + + void successfulPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; } + + final Ready ready = checkReady(); + LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); + recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.PRE_COMMIT_COMPLETE; } - private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause); + return; + } + recordAndSendFailure(envelope, now, cause); state = new Failed(cause); LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause); @@ -269,7 +307,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { case READY: throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage); default: - throw new IllegalStateException("Unhandled commit stage " + ready.stage); + throwUnhandledCommitStage(ready); } } @@ -312,9 +350,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { final Ready ready = checkReady(); startAbort(); - ready.readyCohort.abort(new FutureCallback() { + ready.readyCohort.abort(new FutureCallback<>() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final Empty result) { recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence)); finishAbort(); } @@ -340,13 +378,10 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { case READY: ready.stage = CommitStage.CAN_COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier()); - checkReady().readyCohort.canCommit(new FutureCallback() { + checkReady().readyCohort.canCommit(new FutureCallback<>() { @Override - public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(), - envelope.getMessage().getSequence())); - ready.stage = CommitStage.CAN_COMMIT_COMPLETE; - LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); + public void onSuccess(final Empty result) { + successfulCanCommit(envelope, now); } @Override @@ -361,8 +396,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { case PRE_COMMIT_PENDING: throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage); default: - throw new IllegalStateException("Unhandled commit stage " + ready.stage); + throwUnhandledCommitStage(ready); + } + } + + void successfulCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; } + + final Ready ready = checkReady(); + recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.CAN_COMMIT_COMPLETE; + LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); } private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException { @@ -381,9 +430,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { case READY: ready.stage = CommitStage.CAN_COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier()); - ready.readyCohort.canCommit(new FutureCallback() { + ready.readyCohort.canCommit(new FutureCallback<>() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final Empty result) { successfulDirectCanCommit(envelope, now); } @@ -394,11 +443,16 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); break; default: - throw new IllegalStateException("Unhandled commit stage " + ready.stage); + throwUnhandledCommitStage(ready); } } void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); ready.stage = CommitStage.PRE_COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier()); @@ -416,6 +470,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); ready.stage = CommitStage.COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier()); @@ -433,6 +492,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier()); + return; + } + recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(), envelope.getMessage().getSequence())); state = COMMITTED; @@ -446,11 +510,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { throw new UnsupportedRequestException(request); } - final java.util.Optional optFailure = request.getDelayedFailure(); + final Optional optFailure = request.getDelayedFailure(); if (optFailure.isPresent()) { state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get())); } else { - state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification)); + state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification, Optional.empty())); } if (request.isCoordinated()) { @@ -460,16 +524,14 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } - private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) - throws RequestException { - final Optional> data = checkOpen().getSnapshot().readNode(request.getPath()); + private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) { + final Optional data = checkOpen().getSnapshot().readNode(request.getPath()); return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(), data.isPresent())); } - private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) - throws RequestException { - final Optional> data = checkOpen().getSnapshot().readNode(request.getPath()); + private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) { + final Optional data = checkOpen().getSnapshot().readNode(request.getPath()); return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(), data)); } @@ -499,7 +561,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { final RequestEnvelope envelope, final long now) throws RequestException { // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no // protocol, there is nothing for us to do. - final java.util.Optional maybeProto = request.getPersistenceProtocol(); + final Optional maybeProto = request.getPersistenceProtocol(); if (!maybeProto.isPresent()) { applyModifications(request.getModifications()); return replyModifySuccess(request.getSequence()); @@ -545,7 +607,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } applyModifications(modifications); - state = new Ready(checkOpen().ready()); + state = new Ready(checkOpen().ready(Optional.empty())); LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier()); } @@ -557,20 +619,21 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } private ReadWriteShardDataTreeTransaction checkOpen() { - Preconditions.checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(), - state); + checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(), state); return ((Open) state).openTransaction; } private Ready checkReady() { - Preconditions.checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(), - state); + checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(), state); return (Ready) state; } private DataTreeModification checkSealed() { - Preconditions.checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(), - state); + checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(), state); return ((Sealed) state).sealedModification; } + + private static void throwUnhandledCommitStage(final Ready ready) { + throw new IllegalStateException("Unhandled commit stage " + ready.stage); + } }