X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FSimpleShardDataTreeCohort.java;h=a65cef82286d338d6612456cde480ad07d6108e2;hp=b9e39975e5c869529a31fa2a56dad6e0c0db9378;hb=e84f63ee098fff5b02cbce1281ca0d1208f966fa;hpb=0b3bfca7f39ee1bb6dcf5379f44d0b402adeb7fe diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index b9e39975e5..a65cef8228 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -7,59 +7,32 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.dispatch.ExecutionContexts; -import akka.dispatch.Futures; -import akka.dispatch.OnComplete; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.SortedSet; +import java.util.concurrent.CompletionStage; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Future; - -abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { - static final class DeadOnArrival extends SimpleShardDataTreeCohort { - private final Exception failure; - - DeadOnArrival(final ShardDataTree dataTree, final DataTreeModification transaction, - final TransactionIdentifier transactionId, final Exception failure) { - super(dataTree, transaction, transactionId, null); - this.failure = Preconditions.checkNotNull(failure); - } - - @Override - void throwCanCommitFailure() throws Exception { - throw failure; - } - } - - static final class Normal extends SimpleShardDataTreeCohort { - Normal(final ShardDataTree dataTree, final DataTreeModification transaction, - final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) { - super(dataTree, transaction, transactionId, Preconditions.checkNotNull(userCohorts)); - } - - @Override - void throwCanCommitFailure() { - // No-op - } - } +final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); private final DataTreeModification transaction; private final ShardDataTree dataTree; private final TransactionIdentifier transactionId; private final CompositeDataTreeCohort userCohorts; + private final @Nullable SortedSet participatingShardNames; private State state = State.READY; private DataTreeCandidateTip candidate; @@ -67,11 +40,23 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private Exception nextFailure; SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction, - final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) { - this.dataTree = Preconditions.checkNotNull(dataTree); - this.transaction = Preconditions.checkNotNull(transaction); - this.transactionId = Preconditions.checkNotNull(transactionId); - this.userCohorts = userCohorts; + final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts, + final Optional> participatingShardNames) { + this.dataTree = requireNonNull(dataTree); + this.transaction = requireNonNull(transaction); + this.transactionId = requireNonNull(transactionId); + this.userCohorts = requireNonNull(userCohorts); + this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null); + } + + SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction, + final TransactionIdentifier transactionId, final Exception nextFailure) { + this.dataTree = requireNonNull(dataTree); + this.transaction = requireNonNull(transaction); + this.transactionId = requireNonNull(transactionId); + userCohorts = null; + participatingShardNames = null; + this.nextFailure = requireNonNull(nextFailure); } @Override @@ -89,8 +74,14 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { return transaction; } + @Override + Optional> getParticipatingShardNames() { + return Optional.ofNullable(participatingShardNames); + } + private void checkState(final State expected) { - Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected); + Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s", + state, expected, getIdentifier()); } @Override @@ -100,15 +91,20 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } checkState(State.READY); - this.callback = Preconditions.checkNotNull(newCallback); + callback = requireNonNull(newCallback); state = State.CAN_COMMIT_PENDING; - dataTree.startCanCommit(this); + + if (nextFailure == null) { + dataTree.startCanCommit(this); + } else { + failedCanCommit(nextFailure); + } } @Override public void preCommit(final FutureCallback newCallback) { checkState(State.CAN_COMMIT_COMPLETE); - this.callback = Preconditions.checkNotNull(newCallback); + callback = requireNonNull(newCallback); state = State.PRE_COMMIT_PENDING; if (nextFailure == null) { @@ -128,34 +124,25 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { candidate = null; state = State.ABORTED; - final Optional>> maybeAborts = userCohorts.abort(); + final Optional> maybeAborts = userCohorts.abort(); if (!maybeAborts.isPresent()) { abortCallback.onSuccess(null); return; } - final Future> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global()); - if (aborts.isCompleted()) { - abortCallback.onSuccess(null); - return; - } - - aborts.onComplete(new OnComplete>() { - @Override - public void onComplete(final Throwable failure, final Iterable objs) { - if (failure != null) { - abortCallback.onFailure(failure); - } else { - abortCallback.onSuccess(null); - } + maybeAborts.get().whenComplete((noop, failure) -> { + if (failure != null) { + abortCallback.onFailure(failure); + } else { + abortCallback.onSuccess(null); } - }, ExecutionContexts.global()); + }); } @Override public void commit(final FutureCallback newCallback) { checkState(State.PRE_COMMIT_COMPLETE); - this.callback = Preconditions.checkNotNull(newCallback); + callback = requireNonNull(newCallback); state = State.COMMIT_PENDING; if (nextFailure == null) { @@ -167,16 +154,16 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private FutureCallback switchState(final State newState) { @SuppressWarnings("unchecked") - final FutureCallback ret = (FutureCallback) this.callback; - this.callback = null; + final FutureCallback ret = (FutureCallback) callback; + callback = null; LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState); - this.state = newState; + state = newState; return ret; } void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) { checkState(State.PRE_COMMIT_COMPLETE); - this.candidate = Verify.verifyNotNull(dataTreeCandidate); + candidate = verifyNotNull(dataTreeCandidate); } void successfulCanCommit() { @@ -192,23 +179,49 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { * any failure to validate is propagated before we record the transaction. * * @param dataTreeCandidate {@link DataTreeCandidate} under consideration - * @throws ExecutionException if the operation fails - * @throws TimeoutException if the operation times out + * @param futureCallback the callback to invoke on completion, which may be immediate or async. */ - // FIXME: this should be asynchronous - void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException { + void userPreCommit(final DataTreeCandidate dataTreeCandidate, final FutureCallback futureCallback) { userCohorts.reset(); - userCohorts.canCommit(dataTreeCandidate); - userCohorts.preCommit(); + + final Optional> maybeCanCommitFuture = userCohorts.canCommit(dataTreeCandidate); + if (!maybeCanCommitFuture.isPresent()) { + doUserPreCommit(futureCallback); + return; + } + + maybeCanCommitFuture.get().whenComplete((noop, failure) -> { + if (failure != null) { + futureCallback.onFailure(failure); + } else { + doUserPreCommit(futureCallback); + } + }); + } + + private void doUserPreCommit(final FutureCallback futureCallback) { + final Optional> maybePreCommitFuture = userCohorts.preCommit(); + if (!maybePreCommitFuture.isPresent()) { + futureCallback.onSuccess(null); + return; + } + + maybePreCommitFuture.get().whenComplete((noop, failure) -> { + if (failure != null) { + futureCallback.onFailure(failure); + } else { + futureCallback.onSuccess(null); + } + }); } void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) { LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate); - this.candidate = Verify.verifyNotNull(dataTreeCandidate); + candidate = verifyNotNull(dataTreeCandidate); switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate); } - void failedPreCommit(final Exception cause) { + void failedPreCommit(final Throwable cause) { if (LOG.isTraceEnabled()) { LOG.trace("Transaction {} failed to prepare", transaction, cause); } else { @@ -219,15 +232,25 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { switchState(State.FAILED).onFailure(cause); } - void successfulCommit(final UnsignedLong journalIndex) { - try { - userCohorts.commit(); - } catch (TimeoutException | ExecutionException e) { - // We are probably dead, depending on what the cohorts end up doing - LOG.error("User cohorts failed to commit", e); + void successfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) { + final Optional> maybeCommitFuture = userCohorts.commit(); + if (!maybeCommitFuture.isPresent()) { + finishSuccessfulCommit(journalIndex, onComplete); + return; } + maybeCommitFuture.get().whenComplete((noop, failure) -> { + if (failure != null) { + LOG.error("User cohorts failed to commit", failure); + } + + finishSuccessfulCommit(journalIndex, onComplete); + }); + } + + private void finishSuccessfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) { switchState(State.COMMITTED).onSuccess(journalIndex); + onComplete.run(); } void failedCommit(final Exception cause) { @@ -247,18 +270,20 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } void reportFailure(final Exception cause) { - this.nextFailure = Preconditions.checkNotNull(cause); + if (nextFailure == null) { + nextFailure = requireNonNull(cause); + } else { + LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause); + } } - /** - * If there is an initial failure, throw it so the caller can process it. - * - * @throws Exception reported failure. - */ - abstract void throwCanCommitFailure() throws Exception; - @Override public boolean isFailed() { return state == State.FAILED || nextFailure != null; } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return super.addToStringAttributes(toStringHelper).add("nextFailure", nextFailure); + } }