Bug 8231: Fix testChangeListenerRegistration failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
index 8115473a0565fb2c3865f4bc03e7216a8029a579..4044a4cb3a0ec991ca429a1d5c3dac63db426768 100644 (file)
@@ -13,7 +13,6 @@ import akka.actor.Status.Failure;
 import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
 import akka.dispatch.Recover;
-import akka.japi.Function;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
@@ -29,18 +28,20 @@ import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanComm
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
 /**
- *
  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
- *
+ * <p/>
  * It tracks current operation and list of cohorts which successfuly finished previous phase in
  * case, if abort is necessary to invoke it only on cohort steps which are still active.
  *
  */
 class CompositeDataTreeCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
 
     private enum State {
         /**
@@ -72,14 +73,11 @@ class CompositeDataTreeCohort {
          */
         COMMITED,
         /**
-         * Some of cohorts responsed back with unsuccessful message.
-         *
+         * Some of cohorts responded back with unsuccessful message.
          */
         FAILED,
         /**
-         *
          * Abort message was send to all cohorts which responded with success previously.
-         *
          */
         ABORTED
     }
@@ -107,22 +105,41 @@ class CompositeDataTreeCohort {
         this.timeout = Preconditions.checkNotNull(timeout);
     }
 
+    void reset() {
+        switch (state) {
+            case CAN_COMMIT_SENT:
+            case CAN_COMMIT_SUCCESSFUL:
+            case PRE_COMMIT_SENT:
+            case PRE_COMMIT_SUCCESSFUL:
+            case COMMIT_SENT:
+                abort();
+                break;
+            default :
+                break;
+        }
+
+        successfulFromPrevious = null;
+        state = State.IDLE;
+    }
+
     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
+        LOG.debug("{}: canCommit -  candidate: {}", txId, tip);
+
         Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+
+        LOG.debug("{}: canCommit - messages: {}", txId, messages);
+
         // FIXME: Optimize empty collection list with pre-created futures, containing success.
-        Future<Iterable<Object>> canCommitsFuture =
-                Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
-                    @Override
-                    public Future<Object> apply(final CanCommit input) {
-                        return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
-                                ExecutionContexts.global());
-                    }
-                }, ExecutionContexts.global());
+        Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
+            input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
+                    ExecutionContexts.global()), ExecutionContexts.global());
         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
         processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
     }
 
     void preCommit() throws ExecutionException, TimeoutException {
+        LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
         Preconditions.checkState(successfulFromPrevious != null);
         Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
@@ -130,6 +147,8 @@ class CompositeDataTreeCohort {
     }
 
     void commit() throws ExecutionException, TimeoutException {
+        LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
         Preconditions.checkState(successfulFromPrevious != null);
         Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
@@ -137,7 +156,10 @@ class CompositeDataTreeCohort {
     }
 
     Optional<Future<Iterable<Object>>> abort() {
-        if (successfulFromPrevious != null) {
+        LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
+        state = State.ABORTED;
+        if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) {
             return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
         }
 
@@ -145,28 +167,31 @@ class CompositeDataTreeCohort {
     }
 
     private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
-        return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
-
-            @Override
-            public Future<Object> apply(final DataTreeCohortActor.Success cohortResponse) throws Exception {
-                return Patterns.ask(cohortResponse.getCohort(), message, timeout);
-            }
+        LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
 
-        }, ExecutionContexts.global());
+        return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
+                cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
     }
 
-    private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState, final State afterState)
-            throws TimeoutException, ExecutionException {
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
+            final State afterState) throws TimeoutException, ExecutionException {
+        LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+
         final Iterable<Object> results;
         try {
             results = Await.result(resultsFuture, timeout.duration());
         } catch (Exception e) {
             successfulFromPrevious = null;
+            LOG.debug("{}: processResponses - error from Future", txId, e);
             Throwables.propagateIfInstanceOf(e, TimeoutException.class);
             throw Throwables.propagate(e);
         }
         Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
         Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
+
+        LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
+
         successfulFromPrevious = successful;
         if (!Iterables.isEmpty(failed)) {
             changeStateFrom(currentState, State.FAILED);