import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
-import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.mdsal.common.api.PostCanCommitStep;
import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
* decapsulating DataTreeChanged messages and dispatching their context to the user.
*/
final class DataTreeCohortActor extends AbstractUntypedActor {
- private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
- private final CohortBehaviour<?> idleState = new Idle();
+ private final Idle idleState = new Idle();
private final DOMDataTreeCommitCohort cohort;
- private CohortBehaviour<?> currentState = idleState;
+ private final YangInstanceIdentifier registeredPath;
+ private final Map<TransactionIdentifier, CohortBehaviour<?, ?>> currentStateMap = new HashMap<>();
- private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
- this.cohort = Preconditions.checkNotNull(cohort);
+ private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
+ this.cohort = Objects.requireNonNull(cohort);
+ this.registeredPath = Objects.requireNonNull(registeredPath);
}
@Override
protected void handleReceive(final Object message) {
- currentState = currentState.handle(message);
- }
+ if (!(message instanceof CommitProtocolCommand)) {
+ unknownMessage(message);
+ return;
+ }
+
+ CommitProtocolCommand<?> command = (CommitProtocolCommand<?>)message;
+ CohortBehaviour<?, ?> currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState);
+ LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
+ currentState, message);
+
+ currentState.handle(command);
+ }
/**
* Abstract message base for messages handled by {@link DataTreeCohortActor}.
*
* @param <R> Reply message type
*/
- static abstract class CommitProtocolCommand<R extends CommitReply> {
+ abstract static class CommitProtocolCommand<R extends CommitReply> {
private final TransactionIdentifier txId;
}
protected CommitProtocolCommand(TransactionIdentifier txId) {
- this.txId = Preconditions.checkNotNull(txId);
+ this.txId = Objects.requireNonNull(txId);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " [txId=" + txId + "]";
}
}
static final class CanCommit extends CommitProtocolCommand<Success> {
- private final DOMDataTreeCandidate candidate;
+ private final Collection<DOMDataTreeCandidate> candidates;
private final ActorRef cohort;
private final SchemaContext schema;
- CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
+ CanCommit(TransactionIdentifier txId, Collection<DOMDataTreeCandidate> candidates, SchemaContext schema,
+ ActorRef cohort) {
super(txId);
- this.cohort = Preconditions.checkNotNull(cohort);
- this.candidate = Preconditions.checkNotNull(candidate);
- this.schema = Preconditions.checkNotNull(schema);
+ this.cohort = Objects.requireNonNull(cohort);
+ this.candidates = Objects.requireNonNull(candidates);
+ this.schema = Objects.requireNonNull(schema);
}
- DOMDataTreeCandidate getCandidate() {
- return candidate;
+ Collection<DOMDataTreeCandidate> getCandidates() {
+ return candidates;
}
SchemaContext getSchema() {
return cohort;
}
+ @Override
+ public String toString() {
+ return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort + "]";
+ }
}
- static abstract class CommitReply {
+ abstract static class CommitReply {
private final ActorRef cohortRef;
private final TransactionIdentifier txId;
protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
- this.cohortRef = Preconditions.checkNotNull(cohortRef);
- this.txId = Preconditions.checkNotNull(txId);
+ this.cohortRef = Objects.requireNonNull(cohortRef);
+ this.txId = Objects.requireNonNull(txId);
}
ActorRef getCohort() {
final TransactionIdentifier getTxId() {
return txId;
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
+ }
}
static final class Success extends CommitReply {
- public Success(ActorRef cohortRef, TransactionIdentifier txId) {
+ Success(ActorRef cohortRef, TransactionIdentifier txId) {
super(cohortRef, txId);
}
-
}
static final class PreCommit extends CommitProtocolCommand<Success> {
- public PreCommit(TransactionIdentifier txId) {
+ PreCommit(TransactionIdentifier txId) {
super(txId);
}
}
static final class Abort extends CommitProtocolCommand<Success> {
- public Abort(TransactionIdentifier txId) {
+ Abort(TransactionIdentifier txId) {
super(txId);
}
}
static final class Commit extends CommitProtocolCommand<Success> {
- public Commit(TransactionIdentifier txId) {
+ Commit(TransactionIdentifier txId) {
super(txId);
}
}
- private static abstract class CohortBehaviour<E> {
+ private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
+ private final Class<M> handledMessageType;
- abstract Class<E> getHandledMessageType();
+ CohortBehaviour(Class<M> handledMessageType) {
+ this.handledMessageType = Objects.requireNonNull(handledMessageType);
+ }
- CohortBehaviour<?> handle(Object message) {
- if (getHandledMessageType().isInstance(message)) {
- return process(getHandledMessageType().cast(message));
- } else if (message instanceof Abort) {
- return abort();
+ void handle(CommitProtocolCommand<?> command) {
+ if (handledMessageType.isInstance(command)) {
+ onMessage(command);
+ } else if (command instanceof Abort) {
+ onAbort(((Abort)command).getTxId());
+ } else {
+ getSender().tell(new Status.Failure(new IllegalArgumentException(String.format(
+ "Unexpected message %s in cohort behavior %s", command.getClass(),
+ getClass().getSimpleName()))), getSelf());
}
- throw new UnsupportedOperationException();
}
- abstract CohortBehaviour<?> abort();
+ private void onMessage(CommitProtocolCommand<?> message) {
+ final ActorRef sender = getSender();
+ TransactionIdentifier txId = message.getTxId();
+ ListenableFuture<S> future = process(handledMessageType.cast(message));
+ Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
+ : DataTreeCohortActor.this::executeInSelf;
+ Futures.addCallback(future, new FutureCallback<S>() {
+ @Override
+ public void onSuccess(S nextStep) {
+ success(txId, sender, nextStep);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ failed(txId, sender, failure);
+ }
+ }, callbackExecutor);
+ }
- abstract CohortBehaviour<?> process(E message);
+ private void failed(TransactionIdentifier txId, ActorRef sender, Throwable failure) {
+ currentStateMap.remove(txId);
+ sender.tell(new Status.Failure(failure), getSelf());
+ }
- }
+ private void success(TransactionIdentifier txId, ActorRef sender, S nextStep) {
+ currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
+ sender.tell(new Success(getSelf(), txId), getSelf());
+ }
- private class Idle extends CohortBehaviour<CanCommit> {
+ private void onAbort(TransactionIdentifier txId) {
+ currentStateMap.remove(txId);
+ final ActorRef sender = getSender();
+ Futures.addCallback(abort(), new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object noop) {
+ sender.tell(new Success(getSelf(), txId), getSelf());
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
+ sender.tell(new Status.Failure(failure), getSelf());
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ @Nullable
+ abstract CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
+
+ @Nonnull
+ abstract ListenableFuture<S> process(M command);
+
+ abstract ListenableFuture<?> abort();
@Override
- Class<CanCommit> getHandledMessageType() {
- return CanCommit.class;
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+ }
+
+ private class Idle extends CohortBehaviour<CanCommit, PostCanCommitStep> {
+ Idle() {
+ super(CanCommit.class);
}
@Override
- CohortBehaviour<?> process(CanCommit message) {
- final PostCanCommitStep nextStep;
- try {
- nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
- } catch (final Exception e) {
- getSender().tell(new Status.Failure(e), getSelf());
- return this;
- }
- getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
- return new PostCanCommit(message.getTxId(), nextStep);
+ ListenableFuture<PostCanCommitStep> process(CanCommit message) {
+ return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
}
@Override
- CohortBehaviour<?> abort() {
- return this;
+ CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostCanCommitStep nextStep) {
+ return new PostCanCommit(txId, nextStep);
}
+ @Override
+ ListenableFuture<?> abort() {
+ return ThreePhaseCommitStep.NOOP_ABORT_FUTURE;
+ }
}
-
- private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
- extends CohortBehaviour<M> {
-
+ private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep,
+ N extends ThreePhaseCommitStep> extends CohortBehaviour<M, N> {
private final S step;
private final TransactionIdentifier txId;
- CohortStateWithStep(TransactionIdentifier txId, S step) {
- this.txId = Preconditions.checkNotNull(txId);
- this.step = Preconditions.checkNotNull(step);
+ CohortStateWithStep(Class<M> handledMessageType, TransactionIdentifier txId, S step) {
+ super(handledMessageType);
+ this.txId = Objects.requireNonNull(txId);
+ this.step = Objects.requireNonNull(step);
}
final S getStep() {
return step;
}
- final TransactionIdentifier getTxId() {
- return txId;
+ @Override
+ ListenableFuture<?> abort() {
+ return getStep().abort();
}
@Override
- final CohortBehaviour<?> abort() {
- try {
- getStep().abort().get();
- } catch (final Exception e) {
- LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
- getSender().tell(new Status.Failure(e), getSelf());
- return idleState;
- }
- getSender().tell(new Success(getSelf(), txId), getSelf());
- return idleState;
+ public String toString() {
+ return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
}
-
}
- private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
+ private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
- super(txId, nextStep);
+ super(PreCommit.class, txId, nextStep);
}
+ @SuppressWarnings("unchecked")
@Override
- Class<PreCommit> getHandledMessageType() {
- return PreCommit.class;
+ ListenableFuture<PostPreCommitStep> process(PreCommit message) {
+ return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
}
@Override
- CohortBehaviour<?> process(PreCommit message) {
- final PostPreCommitStep nextStep;
- try {
- nextStep = getStep().preCommit().get();
- } catch (final Exception e) {
- getSender().tell(new Status.Failure(e), getSelf());
- return idleState;
- }
- getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
- return new PostPreCommit(getTxId(), nextStep);
+ CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostPreCommitStep nextStep) {
+ return new PostPreCommit(txId, nextStep);
}
}
- private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
+ private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
- super(txId, step);
+ super(Commit.class, txId, step);
}
+ @SuppressWarnings("unchecked")
@Override
- CohortBehaviour<?> process(Commit message) {
- try {
- getStep().commit().get();
- } catch (final Exception e) {
- getSender().tell(new Status.Failure(e), getSender());
- return idleState;
- }
- getSender().tell(new Success(getSelf(), getTxId()), getSelf());
- return idleState;
+ ListenableFuture<NoopThreePhaseCommitStep> process(Commit message) {
+ return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
}
@Override
- Class<Commit> getHandledMessageType() {
- return Commit.class;
+ CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, NoopThreePhaseCommitStep nextStep) {
+ return null;
}
+ }
+ private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep {
}
- static Props props(final DOMDataTreeCommitCohort cohort) {
- return Props.create(DataTreeCohortActor.class, cohort);
+ static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
+ return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
}
}