X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FSimpleShardDataTreeCohort.java;h=6c159b1f5e1ce65c211b4a0ea2442ec964c319b3;hp=bb016a28bdb29b994a8c23697e6c6fe7843a2d2b;hb=b99dc64f4c2373e28c3c94c11cedad0e5f7abe1d;hpb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index bb016a28bd..6c159b1f5e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -8,20 +8,18 @@ package org.opendaylight.controller.cluster.datastore; import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; +import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; -import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -29,9 +27,41 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Identifiable { +abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { + static final class DeadOnArrival extends SimpleShardDataTreeCohort { + private final Exception failure; + + DeadOnArrival(final ShardDataTree dataTree, final DataTreeModification transaction, + final TransactionIdentifier transactionId, final Exception failure) { + super(dataTree, transaction, transactionId, null); + this.failure = Preconditions.checkNotNull(failure); + } + + @Override + void throwCanCommitFailure() throws Exception { + throw failure; + } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return super.addToStringAttributes(toStringHelper).add("failure", failure); + } + } + + static final class Normal extends SimpleShardDataTreeCohort { + Normal(final ShardDataTree dataTree, final DataTreeModification transaction, + final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) { + super(dataTree, transaction, transactionId, Preconditions.checkNotNull(userCohorts)); + } + + @Override + void throwCanCommitFailure() { + // No-op + } + } + private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); - private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + private final DataTreeModification transaction; private final ShardDataTree dataTree; private final TransactionIdentifier transactionId; @@ -47,7 +77,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Ide this.dataTree = Preconditions.checkNotNull(dataTree); this.transaction = Preconditions.checkNotNull(transaction); this.transactionId = Preconditions.checkNotNull(transactionId); - this.userCohorts = Preconditions.checkNotNull(userCohorts); + this.userCohorts = userCohorts; } @Override @@ -61,34 +91,30 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Ide } @Override - public DataTreeModification getDataTreeModification() { - DataTreeModification dataTreeModification = transaction; - if (transaction instanceof PruningDataTreeModification){ - dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification(); - } - return dataTreeModification; + DataTreeModification getDataTreeModification() { + return transaction; } - private void checkState(State expected) { + private void checkState(final State expected) { Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected); } @Override - public void canCommit(final FutureCallback callback) { - if(state == State.CAN_COMMIT_PENDING) { + public void canCommit(final FutureCallback newCallback) { + if (state == State.CAN_COMMIT_PENDING) { return; } checkState(State.READY); - this.callback = Preconditions.checkNotNull(callback); + this.callback = Preconditions.checkNotNull(newCallback); state = State.CAN_COMMIT_PENDING; dataTree.startCanCommit(this); } @Override - public void preCommit(final FutureCallback callback) { + public void preCommit(final FutureCallback newCallback) { checkState(State.CAN_COMMIT_COMPLETE); - this.callback = Preconditions.checkNotNull(callback); + this.callback = Preconditions.checkNotNull(newCallback); state = State.PRE_COMMIT_PENDING; if (nextFailure == null) { @@ -99,41 +125,50 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Ide } @Override - public ListenableFuture abort() { - dataTree.startAbort(this); + public void abort(final FutureCallback abortCallback) { + if (!dataTree.startAbort(this)) { + abortCallback.onSuccess(null); + return; + } + + candidate = null; state = State.ABORTED; - final Optional>> maybeAborts = userCohorts.abort(); + final Optional>> maybeAborts = userCohorts.abort(); if (!maybeAborts.isPresent()) { - return VOID_FUTURE; + abortCallback.onSuccess(null); + return; } - final Future> aborts = maybeAborts.get(); + final Future> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global()); if (aborts.isCompleted()) { - return VOID_FUTURE; + abortCallback.onSuccess(null); + return; } - final SettableFuture ret = SettableFuture.create(); aborts.onComplete(new OnComplete>() { @Override public void onComplete(final Throwable failure, final Iterable objs) { if (failure != null) { - ret.setException(failure); + abortCallback.onFailure(failure); } else { - ret.set(null); + abortCallback.onSuccess(null); } } }, ExecutionContexts.global()); - - return ret; } @Override - public void commit(final FutureCallback callback) { + public void commit(final FutureCallback newCallback) { checkState(State.PRE_COMMIT_COMPLETE); - this.callback = Preconditions.checkNotNull(callback); + this.callback = Preconditions.checkNotNull(newCallback); state = State.COMMIT_PENDING; - dataTree.startCommit(this, candidate); + + if (nextFailure == null) { + dataTree.startCommit(this, candidate); + } else { + failedCommit(nextFailure); + } } private FutureCallback switchState(final State newState) { @@ -145,6 +180,11 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Ide return ret; } + void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) { + checkState(State.PRE_COMMIT_COMPLETE); + this.candidate = Verify.verifyNotNull(dataTreeCandidate); + } + void successfulCanCommit() { switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null); } @@ -157,20 +197,21 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Ide * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that * any failure to validate is propagated before we record the transaction. * - * @param candidate {@link DataTreeCandidate} under consideration - * @throws ExecutionException - * @throws TimeoutException + * @param dataTreeCandidate {@link DataTreeCandidate} under consideration + * @throws ExecutionException if the operation fails + * @throws TimeoutException if the operation times out */ // FIXME: this should be asynchronous - void userPreCommit(final DataTreeCandidate candidate) throws ExecutionException, TimeoutException { - userCohorts.canCommit(candidate); + void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException { + userCohorts.reset(); + userCohorts.canCommit(dataTreeCandidate); userCohorts.preCommit(); } - void successfulPreCommit(final DataTreeCandidateTip candidate) { - LOG.trace("Transaction {} prepared candidate {}", transaction, candidate); - this.candidate = Verify.verifyNotNull(candidate); - switchState(State.PRE_COMMIT_COMPLETE).onSuccess(candidate); + void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) { + LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate); + this.candidate = Verify.verifyNotNull(dataTreeCandidate); + switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate); } void failedPreCommit(final Exception cause) { @@ -215,8 +256,20 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Ide this.nextFailure = Preconditions.checkNotNull(cause); } + /** + * If there is an initial failure, throw it so the caller can process it. + * + * @throws Exception reported failure. + */ + abstract void throwCanCommitFailure() throws Exception; + @Override public boolean isFailed() { return state == State.FAILED || nextFailure != null; } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return super.addToStringAttributes(toStringHelper).add("nextFailure", nextFailure); + } }