Adjust to DOMDataTreeCommitCohort API change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
index e6ff10d8315cd47da347ae4764d20c7b825b51eb..26c5f45568e2ca00ea1205a27175504dbe90cfc7 100644 (file)
@@ -11,7 +11,17 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status;
-import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
@@ -27,21 +37,31 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * decapsulating DataTreeChanged messages and dispatching their context to the user.
  */
 final class DataTreeCohortActor extends AbstractUntypedActor {
-    private final CohortBehaviour<?> idleState = new Idle();
+    private final Idle idleState = new Idle();
     private final DOMDataTreeCommitCohort cohort;
     private final YangInstanceIdentifier registeredPath;
-    private CohortBehaviour<?> currentState = idleState;
+    private final Map<TransactionIdentifier, CohortBehaviour<?, ?>> currentStateMap = new HashMap<>();
 
     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
-        this.cohort = Preconditions.checkNotNull(cohort);
-        this.registeredPath = Preconditions.checkNotNull(registeredPath);
+        this.cohort = Objects.requireNonNull(cohort);
+        this.registeredPath = Objects.requireNonNull(registeredPath);
     }
 
     @Override
     protected void handleReceive(final Object message) {
-        currentState = currentState.handle(message);
-    }
+        if (!(message instanceof CommitProtocolCommand)) {
+            unknownMessage(message);
+            return;
+        }
+
+        CommitProtocolCommand<?> command = (CommitProtocolCommand<?>)message;
+        CohortBehaviour<?, ?> currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState);
 
+        LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
+                currentState, message);
+
+        currentState.handle(command);
+    }
 
     /**
      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
@@ -57,25 +77,31 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
         }
 
         protected CommitProtocolCommand(TransactionIdentifier txId) {
-            this.txId = Preconditions.checkNotNull(txId);
+            this.txId = Objects.requireNonNull(txId);
+        }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName() + " [txId=" + txId + "]";
         }
     }
 
     static final class CanCommit extends CommitProtocolCommand<Success> {
 
-        private final DOMDataTreeCandidate candidate;
+        private final Collection<DOMDataTreeCandidate> candidates;
         private final ActorRef cohort;
         private final SchemaContext schema;
 
-        CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
+        CanCommit(TransactionIdentifier txId, Collection<DOMDataTreeCandidate> candidates, SchemaContext schema,
+                ActorRef cohort) {
             super(txId);
-            this.cohort = Preconditions.checkNotNull(cohort);
-            this.candidate = Preconditions.checkNotNull(candidate);
-            this.schema = Preconditions.checkNotNull(schema);
+            this.cohort = Objects.requireNonNull(cohort);
+            this.candidates = Objects.requireNonNull(candidates);
+            this.schema = Objects.requireNonNull(schema);
         }
 
-        DOMDataTreeCandidate getCandidate() {
-            return candidate;
+        Collection<DOMDataTreeCandidate> getCandidates() {
+            return candidates;
         }
 
         SchemaContext getSchema() {
@@ -86,6 +112,10 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
             return cohort;
         }
 
+        @Override
+        public String toString() {
+            return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort  + "]";
+        }
     }
 
     abstract static class CommitReply {
@@ -94,8 +124,8 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
         private final TransactionIdentifier txId;
 
         protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
-            this.cohortRef = Preconditions.checkNotNull(cohortRef);
-            this.txId = Preconditions.checkNotNull(txId);
+            this.cohortRef = Objects.requireNonNull(cohortRef);
+            this.txId = Objects.requireNonNull(txId);
         }
 
         ActorRef getCohort() {
@@ -105,6 +135,11 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
         final TransactionIdentifier getTxId() {
             return txId;
         }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
+        }
     }
 
     static final class Success extends CommitReply {
@@ -112,7 +147,6 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
         Success(ActorRef cohortRef, TransactionIdentifier txId) {
             super(cohortRef, txId);
         }
-
     }
 
     static final class PreCommit extends CommitProtocolCommand<Success> {
@@ -136,141 +170,170 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
         }
     }
 
-    private abstract static class CohortBehaviour<E> {
+    private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
+        private final Class<M> handledMessageType;
 
-        abstract Class<E> getHandledMessageType();
+        CohortBehaviour(Class<M> handledMessageType) {
+            this.handledMessageType = Objects.requireNonNull(handledMessageType);
+        }
 
-        CohortBehaviour<?> handle(Object message) {
-            if (getHandledMessageType().isInstance(message)) {
-                return process(getHandledMessageType().cast(message));
-            } else if (message instanceof Abort) {
-                return abort();
+        void handle(CommitProtocolCommand<?> command) {
+            if (handledMessageType.isInstance(command)) {
+                onMessage(command);
+            } else if (command instanceof Abort) {
+                onAbort(((Abort)command).getTxId());
+            } else {
+                getSender().tell(new Status.Failure(new IllegalArgumentException(String.format(
+                        "Unexpected message %s in cohort behavior %s", command.getClass(),
+                        getClass().getSimpleName()))), getSelf());
             }
-            throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
-                    message.getClass(), getClass().getSimpleName()));
         }
 
-        abstract CohortBehaviour<?> abort();
+        private void onMessage(CommitProtocolCommand<?> message) {
+            final ActorRef sender = getSender();
+            TransactionIdentifier txId = message.getTxId();
+            ListenableFuture<S> future = process(handledMessageType.cast(message));
+            Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
+                    : DataTreeCohortActor.this::executeInSelf;
+            Futures.addCallback(future, new FutureCallback<S>() {
+                @Override
+                public void onSuccess(S nextStep) {
+                    success(txId, sender, nextStep);
+                }
+
+                @Override
+                public void onFailure(Throwable failure) {
+                    failed(txId, sender, failure);
+                }
+            }, callbackExecutor);
+        }
 
-        abstract CohortBehaviour<?> process(E message);
+        private void failed(TransactionIdentifier txId, ActorRef sender, Throwable failure) {
+            currentStateMap.remove(txId);
+            sender.tell(new Status.Failure(failure), getSelf());
+        }
 
-    }
+        private void success(TransactionIdentifier txId, ActorRef sender, S nextStep) {
+            currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
+            sender.tell(new Success(getSelf(), txId), getSelf());
+        }
 
-    private class Idle extends CohortBehaviour<CanCommit> {
+        private void onAbort(TransactionIdentifier txId) {
+            currentStateMap.remove(txId);
+            final ActorRef sender = getSender();
+            Futures.addCallback(abort(), new FutureCallback<Object>() {
+                @Override
+                public void onSuccess(Object noop) {
+                    sender.tell(new Success(getSelf(), txId), getSelf());
+                }
+
+                @Override
+                public void onFailure(Throwable failure) {
+                    LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
+                    sender.tell(new Status.Failure(failure), getSelf());
+                }
+            }, MoreExecutors.directExecutor());
+        }
+
+        @Nullable
+        abstract CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
+
+        @Nonnull
+        abstract ListenableFuture<S> process(M command);
+
+        abstract ListenableFuture<?> abort();
 
         @Override
-        Class<CanCommit> getHandledMessageType() {
-            return CanCommit.class;
+        public String toString() {
+            return getClass().getSimpleName();
+        }
+    }
+
+    private class Idle extends CohortBehaviour<CanCommit, PostCanCommitStep> {
+        Idle() {
+            super(CanCommit.class);
         }
 
         @Override
-        @SuppressWarnings("checkstyle:IllegalCatch")
-        CohortBehaviour<?> process(CanCommit message) {
-            final PostCanCommitStep nextStep;
-            try {
-                nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
-            } catch (final Exception e) {
-                getSender().tell(new Status.Failure(e), getSelf());
-                return this;
-            }
-            getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
-            return new PostCanCommit(message.getTxId(), nextStep);
+        ListenableFuture<PostCanCommitStep> process(CanCommit message) {
+            return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
         }
 
         @Override
-        CohortBehaviour<?> abort() {
-            return this;
+        CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostCanCommitStep nextStep) {
+            return new PostCanCommit(txId, nextStep);
         }
 
+        @Override
+        ListenableFuture<?> abort() {
+            return ThreePhaseCommitStep.NOOP_ABORT_FUTURE;
+        }
     }
 
-
-    private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
-            extends CohortBehaviour<M> {
-
+    private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep,
+            N extends ThreePhaseCommitStep> extends CohortBehaviour<M, N> {
         private final S step;
         private final TransactionIdentifier txId;
 
-        CohortStateWithStep(TransactionIdentifier txId, S step) {
-            this.txId = Preconditions.checkNotNull(txId);
-            this.step = Preconditions.checkNotNull(step);
+        CohortStateWithStep(Class<M> handledMessageType, TransactionIdentifier txId, S step) {
+            super(handledMessageType);
+            this.txId = Objects.requireNonNull(txId);
+            this.step = Objects.requireNonNull(step);
         }
 
         final S getStep() {
             return step;
         }
 
-        final TransactionIdentifier getTxId() {
-            return txId;
+        @Override
+        ListenableFuture<?> abort() {
+            return getStep().abort();
         }
 
         @Override
-        @SuppressWarnings("checkstyle:IllegalCatch")
-        final CohortBehaviour<?> abort() {
-            try {
-                getStep().abort().get();
-            } catch (final Exception e) {
-                LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
-                getSender().tell(new Status.Failure(e), getSelf());
-                return idleState;
-            }
-            getSender().tell(new Success(getSelf(), txId), getSelf());
-            return idleState;
+        public String toString() {
+            return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
         }
-
     }
 
-    private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
+    private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
 
         PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
-            super(txId, nextStep);
+            super(PreCommit.class, txId, nextStep);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        Class<PreCommit> getHandledMessageType() {
-            return PreCommit.class;
+        ListenableFuture<PostPreCommitStep> process(PreCommit message) {
+            return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
         }
 
         @Override
-        @SuppressWarnings("checkstyle:IllegalCatch")
-        CohortBehaviour<?> process(PreCommit message) {
-            final PostPreCommitStep nextStep;
-            try {
-                nextStep = getStep().preCommit().get();
-            } catch (final Exception e) {
-                getSender().tell(new Status.Failure(e), getSelf());
-                return idleState;
-            }
-            getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
-            return new PostPreCommit(getTxId(), nextStep);
+        CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostPreCommitStep nextStep) {
+            return new PostPreCommit(txId, nextStep);
         }
 
     }
 
-    private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
+    private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
 
         PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
-            super(txId, step);
+            super(Commit.class, txId, step);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        @SuppressWarnings("checkstyle:IllegalCatch")
-        CohortBehaviour<?> process(Commit message) {
-            try {
-                getStep().commit().get();
-            } catch (final Exception e) {
-                getSender().tell(new Status.Failure(e), getSender());
-                return idleState;
-            }
-            getSender().tell(new Success(getSelf(), getTxId()), getSelf());
-            return idleState;
+        ListenableFuture<NoopThreePhaseCommitStep> process(Commit message) {
+            return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
         }
 
         @Override
-        Class<Commit> getHandledMessageType() {
-            return Commit.class;
+        CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, NoopThreePhaseCommitStep nextStep) {
+            return null;
         }
+    }
 
+    private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep {
     }
 
     static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {