X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FSimpleShardDataTreeCohort.java;h=6c159b1f5e1ce65c211b4a0ea2442ec964c319b3;hp=9f22ce8a73e2380625f2f03c0e1ab5e7b35a2c29;hb=b99dc64f4c2373e28c3c94c11cedad0e5f7abe1d;hpb=dd174b7754c8ab975b6dd37d1891eafa3abba115 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index 9f22ce8a73..6c159b1f5e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -7,25 +7,82 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; +import akka.dispatch.OnComplete; +import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.base.Verify; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { + static final class DeadOnArrival extends SimpleShardDataTreeCohort { + private final Exception failure; + + DeadOnArrival(final ShardDataTree dataTree, final DataTreeModification transaction, + final TransactionIdentifier transactionId, final Exception failure) { + super(dataTree, transaction, transactionId, null); + this.failure = Preconditions.checkNotNull(failure); + } + + @Override + void throwCanCommitFailure() throws Exception { + throw failure; + } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return super.addToStringAttributes(toStringHelper).add("failure", failure); + } + } + + static final class Normal extends SimpleShardDataTreeCohort { + Normal(final ShardDataTree dataTree, final DataTreeModification transaction, + final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) { + super(dataTree, transaction, transactionId, Preconditions.checkNotNull(userCohorts)); + } + + @Override + void throwCanCommitFailure() { + // No-op + } + } -final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); - private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); - private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + private final DataTreeModification transaction; private final ShardDataTree dataTree; + private final TransactionIdentifier transactionId; + private final CompositeDataTreeCohort userCohorts; + + private State state = State.READY; private DataTreeCandidateTip candidate; + private FutureCallback callback; + private Exception nextFailure; - SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) { + SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction, + final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) { this.dataTree = Preconditions.checkNotNull(dataTree); this.transaction = Preconditions.checkNotNull(transaction); + this.transactionId = Preconditions.checkNotNull(transactionId); + this.userCohorts = userCohorts; + } + + @Override + public TransactionIdentifier getIdentifier() { + return transactionId; } @Override @@ -34,49 +91,185 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } @Override - public ListenableFuture canCommit() { - try { - dataTree.getDataTree().validate(transaction); - LOG.debug("Transaction {} validated", transaction); - return TRUE_FUTURE; - } catch (Exception e) { - return Futures.immediateFailedFuture(e); + DataTreeModification getDataTreeModification() { + return transaction; + } + + private void checkState(final State expected) { + Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected); + } + + @Override + public void canCommit(final FutureCallback newCallback) { + if (state == State.CAN_COMMIT_PENDING) { + return; } + + checkState(State.READY); + this.callback = Preconditions.checkNotNull(newCallback); + state = State.CAN_COMMIT_PENDING; + dataTree.startCanCommit(this); } @Override - public ListenableFuture preCommit() { - try { - candidate = dataTree.getDataTree().prepare(transaction); - /* - * FIXME: this is the place where we should be interacting with persistence, specifically by invoking - * persist on the candidate (which gives us a Future). - */ - LOG.debug("Transaction {} prepared candidate {}", transaction, candidate); - return VOID_FUTURE; - } catch (Exception e) { - LOG.debug("Transaction {} failed to prepare", transaction, e); - return Futures.immediateFailedFuture(e); + public void preCommit(final FutureCallback newCallback) { + checkState(State.CAN_COMMIT_COMPLETE); + this.callback = Preconditions.checkNotNull(newCallback); + state = State.PRE_COMMIT_PENDING; + + if (nextFailure == null) { + dataTree.startPreCommit(this); + } else { + failedPreCommit(nextFailure); } } @Override - public ListenableFuture abort() { - // No-op, really - return VOID_FUTURE; + public void abort(final FutureCallback abortCallback) { + if (!dataTree.startAbort(this)) { + abortCallback.onSuccess(null); + return; + } + + candidate = null; + state = State.ABORTED; + + final Optional>> maybeAborts = userCohorts.abort(); + if (!maybeAborts.isPresent()) { + abortCallback.onSuccess(null); + return; + } + + final Future> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global()); + if (aborts.isCompleted()) { + abortCallback.onSuccess(null); + return; + } + + aborts.onComplete(new OnComplete>() { + @Override + public void onComplete(final Throwable failure, final Iterable objs) { + if (failure != null) { + abortCallback.onFailure(failure); + } else { + abortCallback.onSuccess(null); + } + } + }, ExecutionContexts.global()); } @Override - public ListenableFuture commit() { + public void commit(final FutureCallback newCallback) { + checkState(State.PRE_COMMIT_COMPLETE); + this.callback = Preconditions.checkNotNull(newCallback); + state = State.COMMIT_PENDING; + + if (nextFailure == null) { + dataTree.startCommit(this, candidate); + } else { + failedCommit(nextFailure); + } + } + + private FutureCallback switchState(final State newState) { + @SuppressWarnings("unchecked") + final FutureCallback ret = (FutureCallback) this.callback; + this.callback = null; + LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState); + this.state = newState; + return ret; + } + + void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) { + checkState(State.PRE_COMMIT_COMPLETE); + this.candidate = Verify.verifyNotNull(dataTreeCandidate); + } + + void successfulCanCommit() { + switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null); + } + + void failedCanCommit(final Exception cause) { + switchState(State.FAILED).onFailure(cause); + } + + /** + * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that + * any failure to validate is propagated before we record the transaction. + * + * @param dataTreeCandidate {@link DataTreeCandidate} under consideration + * @throws ExecutionException if the operation fails + * @throws TimeoutException if the operation times out + */ + // FIXME: this should be asynchronous + void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException { + userCohorts.reset(); + userCohorts.canCommit(dataTreeCandidate); + userCohorts.preCommit(); + } + + void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) { + LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate); + this.candidate = Verify.verifyNotNull(dataTreeCandidate); + switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate); + } + + void failedPreCommit(final Exception cause) { + if (LOG.isTraceEnabled()) { + LOG.trace("Transaction {} failed to prepare", transaction, cause); + } else { + LOG.error("Transaction {} failed to prepare", transactionId, cause); + } + + userCohorts.abort(); + switchState(State.FAILED).onFailure(cause); + } + + void successfulCommit(final UnsignedLong journalIndex) { try { - dataTree.getDataTree().commit(candidate); - } catch (Exception e) { - LOG.error("Transaction {} failed to commit", transaction, e); - return Futures.immediateFailedFuture(e); + userCohorts.commit(); + } catch (TimeoutException | ExecutionException e) { + // We are probably dead, depending on what the cohorts end up doing + LOG.error("User cohorts failed to commit", e); + } + + switchState(State.COMMITTED).onSuccess(journalIndex); + } + + void failedCommit(final Exception cause) { + if (LOG.isTraceEnabled()) { + LOG.trace("Transaction {} failed to commit", transaction, cause); + } else { + LOG.error("Transaction failed to commit", cause); } - LOG.debug("Transaction {} committed, proceeding to notify", transaction); - dataTree.notifyListeners(candidate); - return VOID_FUTURE; + userCohorts.abort(); + switchState(State.FAILED).onFailure(cause); + } + + @Override + public State getState() { + return state; + } + + void reportFailure(final Exception cause) { + this.nextFailure = Preconditions.checkNotNull(cause); + } + + /** + * If there is an initial failure, throw it so the caller can process it. + * + * @throws Exception reported failure. + */ + abstract void throwCanCommitFailure() throws Exception; + + @Override + public boolean isFailed() { + return state == State.FAILED || nextFailure != null; + } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return super.addToStringAttributes(toStringHelper).add("nextFailure", nextFailure); } }