X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataTreeCohortActor.java;h=37fb4eac32ce427a62ac88e8b4d63722bade0f49;hp=e6ff10d8315cd47da347ae4764d20c7b825b51eb;hb=a6af137c30470b86d4bc624d4c48cb686495a182;hpb=013a6679470bf692753f2e04ab4398c97fd9f5d0 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java index e6ff10d831..37fb4eac32 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java @@ -11,7 +11,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; -import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.mdsal.common.api.PostCanCommitStep; @@ -27,21 +37,31 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * decapsulating DataTreeChanged messages and dispatching their context to the user. */ final class DataTreeCohortActor extends AbstractUntypedActor { - private final CohortBehaviour idleState = new Idle(); + private final Idle idleState = new Idle(); private final DOMDataTreeCommitCohort cohort; private final YangInstanceIdentifier registeredPath; - private CohortBehaviour currentState = idleState; + private final Map> currentStateMap = new HashMap<>(); private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) { - this.cohort = Preconditions.checkNotNull(cohort); - this.registeredPath = Preconditions.checkNotNull(registeredPath); + this.cohort = Objects.requireNonNull(cohort); + this.registeredPath = Objects.requireNonNull(registeredPath); } @Override protected void handleReceive(final Object message) { - currentState = currentState.handle(message); - } + if (!(message instanceof CommitProtocolCommand)) { + unknownMessage(message); + return; + } + + CommitProtocolCommand command = (CommitProtocolCommand)message; + CohortBehaviour currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState); + LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(), + currentState, message); + + currentState.handle(command); + } /** * Abstract message base for messages handled by {@link DataTreeCohortActor}. @@ -57,25 +77,31 @@ final class DataTreeCohortActor extends AbstractUntypedActor { } protected CommitProtocolCommand(TransactionIdentifier txId) { - this.txId = Preconditions.checkNotNull(txId); + this.txId = Objects.requireNonNull(txId); + } + + @Override + public String toString() { + return getClass().getSimpleName() + " [txId=" + txId + "]"; } } static final class CanCommit extends CommitProtocolCommand { - private final DOMDataTreeCandidate candidate; + private final Collection candidates; private final ActorRef cohort; private final SchemaContext schema; - CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) { + CanCommit(TransactionIdentifier txId, Collection candidates, SchemaContext schema, + ActorRef cohort) { super(txId); - this.cohort = Preconditions.checkNotNull(cohort); - this.candidate = Preconditions.checkNotNull(candidate); - this.schema = Preconditions.checkNotNull(schema); + this.cohort = Objects.requireNonNull(cohort); + this.candidates = Objects.requireNonNull(candidates); + this.schema = Objects.requireNonNull(schema); } - DOMDataTreeCandidate getCandidate() { - return candidate; + Collection getCandidates() { + return candidates; } SchemaContext getSchema() { @@ -86,6 +112,10 @@ final class DataTreeCohortActor extends AbstractUntypedActor { return cohort; } + @Override + public String toString() { + return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort + "]"; + } } abstract static class CommitReply { @@ -94,8 +124,8 @@ final class DataTreeCohortActor extends AbstractUntypedActor { private final TransactionIdentifier txId; protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) { - this.cohortRef = Preconditions.checkNotNull(cohortRef); - this.txId = Preconditions.checkNotNull(txId); + this.cohortRef = Objects.requireNonNull(cohortRef); + this.txId = Objects.requireNonNull(txId); } ActorRef getCohort() { @@ -105,6 +135,11 @@ final class DataTreeCohortActor extends AbstractUntypedActor { final TransactionIdentifier getTxId() { return txId; } + + @Override + public String toString() { + return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]"; + } } static final class Success extends CommitReply { @@ -112,7 +147,6 @@ final class DataTreeCohortActor extends AbstractUntypedActor { Success(ActorRef cohortRef, TransactionIdentifier txId) { super(cohortRef, txId); } - } static final class PreCommit extends CommitProtocolCommand { @@ -136,141 +170,170 @@ final class DataTreeCohortActor extends AbstractUntypedActor { } } - private abstract static class CohortBehaviour { + private abstract class CohortBehaviour, S extends ThreePhaseCommitStep> { + private final Class handledMessageType; - abstract Class getHandledMessageType(); + CohortBehaviour(Class handledMessageType) { + this.handledMessageType = Objects.requireNonNull(handledMessageType); + } - CohortBehaviour handle(Object message) { - if (getHandledMessageType().isInstance(message)) { - return process(getHandledMessageType().cast(message)); - } else if (message instanceof Abort) { - return abort(); + void handle(CommitProtocolCommand command) { + if (handledMessageType.isInstance(command)) { + onMessage(command); + } else if (command instanceof Abort) { + onAbort(((Abort)command).getTxId()); + } else { + getSender().tell(new Status.Failure(new IllegalArgumentException(String.format( + "Unexpected message %s in cohort behavior %s", command.getClass(), + getClass().getSimpleName()))), getSelf()); } - throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s", - message.getClass(), getClass().getSimpleName())); } - abstract CohortBehaviour abort(); + private void onMessage(CommitProtocolCommand message) { + final ActorRef sender = getSender(); + TransactionIdentifier txId = message.getTxId(); + ListenableFuture future = process(handledMessageType.cast(message)); + Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor() + : DataTreeCohortActor.this::executeInSelf; + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(S nextStep) { + success(txId, sender, nextStep); + } + + @Override + public void onFailure(Throwable failure) { + failed(txId, sender, failure); + } + }, callbackExecutor); + } - abstract CohortBehaviour process(E message); + private void failed(TransactionIdentifier txId, ActorRef sender, Throwable failure) { + currentStateMap.remove(txId); + sender.tell(new Status.Failure(failure), getSelf()); + } - } + private void success(TransactionIdentifier txId, ActorRef sender, S nextStep) { + currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep)); + sender.tell(new Success(getSelf(), txId), getSelf()); + } - private class Idle extends CohortBehaviour { + private void onAbort(TransactionIdentifier txId) { + currentStateMap.remove(txId); + final ActorRef sender = getSender(); + Futures.addCallback(abort(), new FutureCallback() { + @Override + public void onSuccess(Object noop) { + sender.tell(new Success(getSelf(), txId), getSelf()); + } + + @Override + public void onFailure(Throwable failure) { + LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure); + sender.tell(new Status.Failure(failure), getSelf()); + } + }, MoreExecutors.directExecutor()); + } + + @Nullable + abstract CohortBehaviour nextBehaviour(TransactionIdentifier txId, S nextStep); + + @Nonnull + abstract ListenableFuture process(M command); + + abstract ListenableFuture abort(); @Override - Class getHandledMessageType() { - return CanCommit.class; + public String toString() { + return getClass().getSimpleName(); + } + } + + private class Idle extends CohortBehaviour { + Idle() { + super(CanCommit.class); } @Override - @SuppressWarnings("checkstyle:IllegalCatch") - CohortBehaviour process(CanCommit message) { - final PostCanCommitStep nextStep; - try { - nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get(); - } catch (final Exception e) { - getSender().tell(new Status.Failure(e), getSelf()); - return this; - } - getSender().tell(new Success(getSelf(), message.getTxId()), getSelf()); - return new PostCanCommit(message.getTxId(), nextStep); + ListenableFuture process(CanCommit message) { + return cohort.canCommit(message.getTxId(), message.getCandidates(), message.getSchema()); } @Override - CohortBehaviour abort() { - return this; + CohortBehaviour nextBehaviour(TransactionIdentifier txId, PostCanCommitStep nextStep) { + return new PostCanCommit(txId, nextStep); } + @Override + ListenableFuture abort() { + return ThreePhaseCommitStep.NOOP_ABORT_FUTURE; + } } - - private abstract class CohortStateWithStep, S extends ThreePhaseCommitStep> - extends CohortBehaviour { - + private abstract class CohortStateWithStep, S extends ThreePhaseCommitStep, + N extends ThreePhaseCommitStep> extends CohortBehaviour { private final S step; private final TransactionIdentifier txId; - CohortStateWithStep(TransactionIdentifier txId, S step) { - this.txId = Preconditions.checkNotNull(txId); - this.step = Preconditions.checkNotNull(step); + CohortStateWithStep(Class handledMessageType, TransactionIdentifier txId, S step) { + super(handledMessageType); + this.txId = Objects.requireNonNull(txId); + this.step = Objects.requireNonNull(step); } final S getStep() { return step; } - final TransactionIdentifier getTxId() { - return txId; + @Override + ListenableFuture abort() { + return getStep().abort(); } @Override - @SuppressWarnings("checkstyle:IllegalCatch") - final CohortBehaviour abort() { - try { - getStep().abort().get(); - } catch (final Exception e) { - LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e); - getSender().tell(new Status.Failure(e), getSelf()); - return idleState; - } - getSender().tell(new Success(getSelf(), txId), getSelf()); - return idleState; + public String toString() { + return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]"; } - } - private class PostCanCommit extends CohortStateWithStep { + private class PostCanCommit extends CohortStateWithStep { PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) { - super(txId, nextStep); + super(PreCommit.class, txId, nextStep); } + @SuppressWarnings("unchecked") @Override - Class getHandledMessageType() { - return PreCommit.class; + ListenableFuture process(PreCommit message) { + return (ListenableFuture) getStep().preCommit(); } @Override - @SuppressWarnings("checkstyle:IllegalCatch") - CohortBehaviour process(PreCommit message) { - final PostPreCommitStep nextStep; - try { - nextStep = getStep().preCommit().get(); - } catch (final Exception e) { - getSender().tell(new Status.Failure(e), getSelf()); - return idleState; - } - getSender().tell(new Success(getSelf(), message.getTxId()), getSelf()); - return new PostPreCommit(getTxId(), nextStep); + CohortBehaviour nextBehaviour(TransactionIdentifier txId, PostPreCommitStep nextStep) { + return new PostPreCommit(txId, nextStep); } } - private class PostPreCommit extends CohortStateWithStep { + private class PostPreCommit extends CohortStateWithStep { PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) { - super(txId, step); + super(Commit.class, txId, step); } + @SuppressWarnings("unchecked") @Override - @SuppressWarnings("checkstyle:IllegalCatch") - CohortBehaviour process(Commit message) { - try { - getStep().commit().get(); - } catch (final Exception e) { - getSender().tell(new Status.Failure(e), getSender()); - return idleState; - } - getSender().tell(new Success(getSelf(), getTxId()), getSelf()); - return idleState; + ListenableFuture process(Commit message) { + return (ListenableFuture) getStep().commit(); } @Override - Class getHandledMessageType() { - return Commit.class; + CohortBehaviour nextBehaviour(TransactionIdentifier txId, NoopThreePhaseCommitStep nextStep) { + return null; } + } + private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep { } static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {