X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendReadWriteTransaction.java;h=777490b40c646def4d69c706f74dae49d0de3958;hp=f9d9580214e41c2d28430c3a46c68b005ad6e8bc;hb=62cddd88e42e8f3c6a92bbf42c97b0d6806f44ae;hpb=17a38939f6ba3cbbc1ff0f1f3e00b58f5002813d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index f9d9580214..777490b40c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -7,13 +7,15 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; import java.util.Collection; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; +import java.util.Optional; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; @@ -47,28 +49,127 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Frontend read-write transaction state as observed by the shard leader. + * Frontend read-write transaction state as observed by the shard leader. This class is NOT thread-safe. * * @author Robert Varga */ -@NotThreadSafe final class FrontendReadWriteTransaction extends FrontendTransaction { + private enum CommitStage { + READY, + CAN_COMMIT_PENDING, + CAN_COMMIT_COMPLETE, + PRE_COMMIT_PENDING, + PRE_COMMIT_COMPLETE, + COMMIT_PENDING, + } + + private abstract static class State { + @Override + public abstract String toString(); + } + + private static final class Failed extends State { + final RequestException cause; + + Failed(final RequestException cause) { + this.cause = requireNonNull(cause); + } + + @Override + public String toString() { + return "FAILED (" + cause.getMessage() + ")"; + } + } + + private static final class Open extends State { + final ReadWriteShardDataTreeTransaction openTransaction; + + Open(final ReadWriteShardDataTreeTransaction openTransaction) { + this.openTransaction = requireNonNull(openTransaction); + } + + @Override + public String toString() { + return "OPEN"; + } + } + + private static final class Ready extends State { + final ShardDataTreeCohort readyCohort; + CommitStage stage; + + Ready(final ShardDataTreeCohort readyCohort) { + this.readyCohort = requireNonNull(readyCohort); + this.stage = CommitStage.READY; + } + + @Override + public String toString() { + return "READY (" + stage + ")"; + } + } + + private static final class Sealed extends State { + final DataTreeModification sealedModification; + + Sealed(final DataTreeModification sealedModification) { + this.sealedModification = requireNonNull(sealedModification); + } + + @Override + public String toString() { + return "SEALED"; + } + } + + /** + * Retired state, needed to catch and suppress callbacks after we have removed associated state. + */ + private static final class Retired extends State { + private final String prevStateString; + + Retired(final State prevState) { + prevStateString = prevState.toString(); + } + + @Override + public String toString() { + return "RETIRED (in " + prevStateString + ")"; + } + } + private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class); + private static final State ABORTED = new State() { + @Override + public String toString() { + return "ABORTED"; + } + }; + private static final State ABORTING = new State() { + @Override + public String toString() { + return "ABORTING"; + } + }; + private static final State COMMITTED = new State() { + @Override + public String toString() { + return "COMMITTED"; + } + }; - private ReadWriteShardDataTreeTransaction openTransaction; - private DataTreeModification sealedModification; - private ShardDataTreeCohort readyCohort; + private State state; private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, final ReadWriteShardDataTreeTransaction transaction) { super(history, id); - this.openTransaction = Preconditions.checkNotNull(transaction); + this.state = new Open(transaction); } private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, final DataTreeModification mod) { super(history, id); - this.sealedModification = Preconditions.checkNotNull(mod); + this.state = new Sealed(mod); } static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history, @@ -83,7 +184,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { // Sequence has already been checked @Override - @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, + TransactionSuccess doHandleRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (request instanceof ModifyTransactionRequest) { return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); @@ -101,105 +202,260 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionAbortRequest) { - handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + return handleTransactionAbort(request.getSequence(), envelope, now); + } else if (request instanceof AbortLocalTransactionRequest) { + handleLocalTransactionAbort(request.getSequence(), envelope, now); return null; } else { + LOG.warn("Rejecting unsupported request {}", request); throw new UnsupportedRequestException(request); } } + @Override + void retire() { + state = new Retired(state); + } + private void handleTransactionPreCommit(final TransactionPreCommitRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - readyCohort.preCommit(new FutureCallback() { - @Override - public void onSuccess(final DataTreeCandidate result) { - recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(), - request.getSequence())); - } + throwIfFailed(); + + final Ready ready = checkReady(); + switch (ready.stage) { + case PRE_COMMIT_PENDING: + LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier()); + break; + case CAN_COMMIT_COMPLETE: + ready.stage = CommitStage.PRE_COMMIT_PENDING; + LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier()); + ready.readyCohort.preCommit(new FutureCallback() { + @Override + public void onSuccess(final DataTreeCandidate result) { + successfulPreCommit(envelope, now); + } + + @Override + public void onFailure(final Throwable failure) { + failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure)); + } + }); + break; + case CAN_COMMIT_PENDING: + case COMMIT_PENDING: + case PRE_COMMIT_COMPLETE: + case READY: + throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage); + default: + throwUnhandledCommitStage(ready); + } + } - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure)); - readyCohort = null; - } - }); + void successfulPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; + } + + final Ready ready = checkReady(); + LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); + recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.PRE_COMMIT_COMPLETE; + } + + void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause); + return; + } + + recordAndSendFailure(envelope, now, cause); + state = new Failed(cause); + LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause); } private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - readyCohort.commit(new FutureCallback() { - @Override - public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope, now); - } + throwIfFailed(); + + final Ready ready = checkReady(); + switch (ready.stage) { + case COMMIT_PENDING: + LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier()); + break; + case PRE_COMMIT_COMPLETE: + ready.stage = CommitStage.COMMIT_PENDING; + LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier()); + ready.readyCohort.commit(new FutureCallback() { + @Override + public void onSuccess(final UnsignedLong result) { + successfulCommit(envelope, now); + } + + @Override + public void onFailure(final Throwable failure) { + failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure)); + } + }); + break; + case CAN_COMMIT_COMPLETE: + case CAN_COMMIT_PENDING: + case PRE_COMMIT_PENDING: + case READY: + throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage); + default: + throwUnhandledCommitStage(ready); + } + } - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure)); - readyCohort = null; - } - }); + private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) { + checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + sequence))); } - private void handleTransactionAbort(final TransactionAbortRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - if (readyCohort == null) { - openTransaction.abort(() -> recordAndSendSuccess(envelope, now, - new TransactionAbortSuccess(getIdentifier(), request.getSequence()))); - return; + private void startAbort() { + state = ABORTING; + LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier()); + } + + private void finishAbort() { + state = ABORTED; + LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier()); + } + + private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope, + final long now) { + if (state instanceof Open) { + final ReadWriteShardDataTreeTransaction openTransaction = checkOpen(); + startAbort(); + openTransaction.abort(() -> { + recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + sequence)); + finishAbort(); + }); + return null; + } + if (ABORTING.equals(state)) { + LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier()); + return null; + } + if (ABORTED.equals(state)) { + // We should have recorded the reply + LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier()); + return new TransactionAbortSuccess(getIdentifier(), sequence); } - readyCohort.abort(new FutureCallback() { + final Ready ready = checkReady(); + startAbort(); + ready.readyCohort.abort(new FutureCallback() { @Override public void onSuccess(final Void result) { - readyCohort = null; - recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), - request.getSequence())); - LOG.debug("Transaction {} aborted", getIdentifier()); + recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence)); + finishAbort(); } @Override public void onFailure(final Throwable failure) { - readyCohort = null; - LOG.warn("Transaction {} abort failed", getIdentifier(), failure); recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); + LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure); + finishAbort(); } }); + return null; } - private void coordinatedCommit(final RequestEnvelope envelope, final long now) { - readyCohort.canCommit(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(), - envelope.getMessage().getSequence())); - } + private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException { + throwIfFailed(); - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); - readyCohort = null; - } - }); + final Ready ready = checkReady(); + switch (ready.stage) { + case CAN_COMMIT_PENDING: + LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier()); + break; + case READY: + ready.stage = CommitStage.CAN_COMMIT_PENDING; + LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier()); + checkReady().readyCohort.canCommit(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + successfulCanCommit(envelope, now); + } + + @Override + public void onFailure(final Throwable failure) { + failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); + } + }); + break; + case CAN_COMMIT_COMPLETE: + case COMMIT_PENDING: + case PRE_COMMIT_COMPLETE: + case PRE_COMMIT_PENDING: + throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage); + default: + throwUnhandledCommitStage(ready); + } } - private void directCommit(final RequestEnvelope envelope, final long now) { - readyCohort.canCommit(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - successfulDirectCanCommit(envelope, now); - } + void successfulCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; + } - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); - readyCohort = null; - } - }); + final Ready ready = checkReady(); + recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.CAN_COMMIT_COMPLETE; + LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); } - private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { - readyCohort.preCommit(new FutureCallback() { + private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException { + throwIfFailed(); + + final Ready ready = checkReady(); + switch (ready.stage) { + case CAN_COMMIT_COMPLETE: + case CAN_COMMIT_PENDING: + case COMMIT_PENDING: + case PRE_COMMIT_COMPLETE: + case PRE_COMMIT_PENDING: + LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(), + getIdentifier(), state, envelope); + break; + case READY: + ready.stage = CommitStage.CAN_COMMIT_PENDING; + LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier()); + ready.readyCohort.canCommit(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + successfulDirectCanCommit(envelope, now); + } + + @Override + public void onFailure(final Throwable failure) { + failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); + } + }); + break; + default: + throwUnhandledCommitStage(ready); + } + } + + void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + + final Ready ready = checkReady(); + ready.stage = CommitStage.PRE_COMMIT_PENDING; + LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier()); + ready.readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { successfulDirectPreCommit(envelope, startTime); @@ -207,15 +463,21 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { @Override public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure)); - readyCohort = null; + failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure)); } }); } - private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { - readyCohort.commit(new FutureCallback() { + void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); + ready.stage = CommitStage.COMMIT_PENDING; + LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier()); + ready.readyCohort.commit(new FutureCallback() { @Override public void onSuccess(final UnsignedLong result) { successfulCommit(envelope, startTime); @@ -223,43 +485,52 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { @Override public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure)); - readyCohort = null; + failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure)); } }); } - private void successfulCommit(final RequestEnvelope envelope, final long startTime) { - recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), + void successfulCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier()); + return; + } + + recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(), envelope.getMessage().getSequence())); - readyCohort = null; + state = COMMITTED; } private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - if (sealedModification.equals(request.getModification())) { - readyCohort = history().createReadyCohort(getIdentifier(), sealedModification); + final DataTreeModification sealedModification = checkSealed(); + if (!sealedModification.equals(request.getModification())) { + LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification()); + throw new UnsupportedRequestException(request); + } - if (request.isCoordinated()) { - coordinatedCommit(envelope, now); - } else { - directCommit(envelope, now); - } + final Optional optFailure = request.getDelayedFailure(); + if (optFailure.isPresent()) { + state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get())); } else { - throw new UnsupportedRequestException(request); + state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification, Optional.empty())); + } + + if (request.isCoordinated()) { + coordinatedCommit(envelope, now); + } else { + directCommit(envelope, now); } } - private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) - throws RequestException { - final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); + private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) { + final Optional> data = checkOpen().getSnapshot().readNode(request.getPath()); return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(), data.isPresent())); } - private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) - throws RequestException { - final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); + private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) { + final Optional> data = checkOpen().getSnapshot().readNode(request.getPath()); return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(), data)); } @@ -268,13 +539,10 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence)); } - private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - - final Collection mods = request.getModifications(); - if (!mods.isEmpty()) { - final DataTreeModification modification = openTransaction.getSnapshot(); - for (TransactionModification m : mods) { + private void applyModifications(final Collection modifications) { + if (!modifications.isEmpty()) { + final DataTreeModification modification = checkOpen().getSnapshot(); + for (TransactionModification m : modifications) { if (m instanceof TransactionDelete) { modification.delete(m.getPath()); } else if (m instanceof TransactionWrite) { @@ -282,44 +550,89 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } else if (m instanceof TransactionMerge) { modification.merge(m.getPath(), ((TransactionMerge) m).getData()); } else { - LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m); } } } + } - final java.util.Optional maybeProto = request.getPersistenceProtocol(); + private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no + // protocol, there is nothing for us to do. + final Optional maybeProto = request.getPersistenceProtocol(); if (!maybeProto.isPresent()) { + applyModifications(request.getModifications()); return replyModifySuccess(request.getSequence()); } switch (maybeProto.get()) { case ABORT: - openTransaction.abort(() -> replyModifySuccess(request.getSequence())); - openTransaction = null; + if (ABORTING.equals(state)) { + LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier()); + return null; + } + final ReadWriteShardDataTreeTransaction openTransaction = checkOpen(); + startAbort(); + openTransaction.abort(() -> { + recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(), + request.getSequence())); + finishAbort(); + }); return null; case READY: - ensureReady(); + ensureReady(request.getModifications()); return replyModifySuccess(request.getSequence()); case SIMPLE: - ensureReady(); + ensureReady(request.getModifications()); directCommit(envelope, now); return null; case THREE_PHASE: - ensureReady(); + ensureReady(request.getModifications()); coordinatedCommit(envelope, now); return null; default: + LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get()); throw new UnsupportedRequestException(request); } } - private void ensureReady() { + private void ensureReady(final Collection modifications) { // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction // only once. - if (readyCohort == null) { - readyCohort = openTransaction.ready(); - LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier()); - openTransaction = null; + if (state instanceof Ready) { + LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state); + return; + } + + applyModifications(modifications); + state = new Ready(checkOpen().ready(Optional.empty())); + LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier()); + } + + private void throwIfFailed() throws RequestException { + if (state instanceof Failed) { + LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier()); + throw ((Failed) state).cause; } } + + private ReadWriteShardDataTreeTransaction checkOpen() { + checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(), state); + return ((Open) state).openTransaction; + } + + private Ready checkReady() { + checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(), state); + return (Ready) state; + } + + private DataTreeModification checkSealed() { + checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(), state); + return ((Sealed) state).sealedModification; + } + + private static void throwUnhandledCommitStage(final Ready ready) { + throw new IllegalStateException("Unhandled commit stage " + ready.stage); + } }